From 4d8b045ce131e2dc087ad13b33dedad5b07a9082 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 23 Apr 2024 14:47:57 +0800 Subject: [PATCH 01/12] test: integrate mysql into risedev Signed-off-by: xxchan --- Makefile.toml | 25 +- ci/scripts/e2e-source-test.sh | 18 +- e2e_test/commands/README.md | 8 + e2e_test/commands/internal_table.mjs | 67 ++++++ e2e_test/source/cdc/cdc.create_source_job.slt | 15 -- e2e_test/source/cdc/mysql_cdc.sql | 2 + .../cdc_inline/mysql/mysql_create_drop.slt | 133 ----------- e2e_test/source_inline/README.md | 4 +- .../cdc/mysql/mysql_create_drop.slt | 218 ++++++++++++++++++ risedev.yml | 42 ++++ src/risedevtool/src/bin/risedev-compose.rs | 4 +- src/risedevtool/src/bin/risedev-dev.rs | 101 ++++---- src/risedevtool/src/config.rs | 1 + src/risedevtool/src/risedev_env.rs | 7 + src/risedevtool/src/service_config.rs | 21 ++ .../src/task/task_configure_grpc_node.rs | 10 +- 16 files changed, 461 insertions(+), 215 deletions(-) create mode 100644 e2e_test/commands/README.md create mode 100755 e2e_test/commands/internal_table.mjs delete mode 100644 e2e_test/source/cdc/cdc.create_source_job.slt delete mode 100644 e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt create mode 100644 e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt diff --git a/Makefile.toml b/Makefile.toml index 504ff88a33d5a..9f896adc00b00 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1308,7 +1308,6 @@ echo "All processes has exited." """ [tasks.slt] -env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev" } category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", @@ -1319,6 +1318,19 @@ command = "sqllogictest" args = ["${@}"] description = "🌟 Run SQLLogicTest" +[tasks.slt.env] +SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}" +SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}" +SLT_DB = "dev" +PATH = "${PWD}/e2e_test/commands:${PATH}" + +[tasks.slt-clean] +category = "RiseDev - Test - SQLLogicTest" +dependencies = ["clean-kafka", "reset-rw"] +description = "Run SQLLogicTest with a clean environment" +run_task = "slt" +args = ["${@}"] + [tasks.slt-streaming] category = "RiseDev - Test - SQLLogicTest" extend = "slt" @@ -1433,3 +1445,14 @@ script = """ vars = dump_variables echo ${vars} """ + +[tasks.show-risedev-env] +description = "Show risedev-env environment variables" +dependencies = ["check-risedev-env-file"] +script = ''' +#!/usr/bin/env bash +set -euo pipefail +cat ${PREFIX_CONFIG}/risedev-env +echo "Hint: To load the environment variables into the shell, you may run:" +echo "$(tput setaf 4)\set -a; source ${PREFIX_CONFIG}/risedev-env; set +a$(tput sgr0)" +''' diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 8a683f56b8550..62dc2842879d2 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -82,15 +82,6 @@ echo "--- cdc share source test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 risedev slt './e2e_test/source/cdc/cdc.share_stream.slt' -# create a share source and check whether heartbeat message is received -risedev slt './e2e_test/source/cdc/cdc.create_source_job.slt' -table_id=$(psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs); -table_count=$(psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs); -if [ "$table_count" -eq 0 ]; then - echo "ERROR: internal table of cdc share source is empty!" - exit 1 -fi - echo "--- mysql & postgres load and check" risedev slt './e2e_test/source/cdc/cdc.load.slt' # wait for cdc loading @@ -168,9 +159,12 @@ echo "--- e2e, kafka alter source again" ./scripts/source/prepare_data_after_alter.sh 3 risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' -echo "--- e2e, inline test" -risedev slt './e2e_test/source_inline/**/*.slt' - echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' + +echo "--- Kill cluster" +risedev ci-kill +echo "--- e2e, inline test" +risedev ci-start ci-inline-source-test +risedev slt './e2e_test/source_inline/**/*.slt' diff --git a/e2e_test/commands/README.md b/e2e_test/commands/README.md new file mode 100644 index 0000000000000..56f51159d844e --- /dev/null +++ b/e2e_test/commands/README.md @@ -0,0 +1,8 @@ +# "built-in" Sqllogictest system commands + +Scripts (executables) in this directory are expected to be used as `system` commands in sqllogictests. You can use any language like bash, python, zx. + +They will be loaded in `PATH` by `risedev slt`, and thus function as kind of "built-in" commands. + +Only general commands should be put here. +If the script is ad-hoc (only used for one test), it's better to put next to the test file. diff --git a/e2e_test/commands/internal_table.mjs b/e2e_test/commands/internal_table.mjs new file mode 100755 index 0000000000000..b51cc0313dca2 --- /dev/null +++ b/e2e_test/commands/internal_table.mjs @@ -0,0 +1,67 @@ +#!/usr/bin/env zx + +// zx: A tool for writing better scripts +// https://google.github.io/zx/ + +const { + name: job_name, + type: table_type, + count: count, +} = minimist(process.argv.slice(3), { + string: ["name", "type"], + boolean: ["count"], +}); + +// Return an array of CSV string +async function psql(query) { + return ( + await $` +psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev \ +--csv --tuples-only -c ${query} +` + ) + .toString() + .trim() + .split("\n") + .filter((line) => line.trim() != ""); +} + +// If `table_type` is null, return all internal tables for the job. +// If `job_name` is null, return all jobs' internal tables. +async function select_internal_table(job_name, table_type) { + // Note: if we have `t1`, and `t1_balabala`, the latter one will also be matched 😄. + const internal_tables = await psql( + `select name from rw_internal_tables where name like '__internal_${job_name}_%_${table_type}_%'` + ); + if (internal_tables.length == 0) { + throw new Error( + `No internal tables found for the pattern '__internal_${job_name}_%_${table_type}_%'` + ); + } + + const res = new Map( + await Promise.all( + internal_tables.map(async (t) => { + let rows = await psql(`select * from ${t}`); + return [t, rows]; + }) + ) + ); + return res; +} + +const tables = await select_internal_table(job_name, table_type); +for (const [table_name, rows] of tables) { + if (tables.size > 1) { + console.log(`Table: ${table_name}`); + } + if (count) { + console.log(`count: ${rows.length}`); + } else { + if (rows.length == 0) { + console.log("(empty)"); + } else { + console.log(`${rows.join("\n")}`); + } + } +} diff --git a/e2e_test/source/cdc/cdc.create_source_job.slt b/e2e_test/source/cdc/cdc.create_source_job.slt deleted file mode 100644 index 1f2300087110b..0000000000000 --- a/e2e_test/source/cdc/cdc.create_source_job.slt +++ /dev/null @@ -1,15 +0,0 @@ -control substitution on - -# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` -statement ok -create source mysql_source with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'rwcdc', - password = '${MYSQL_PWD:}', - database.name = 'mytest', - server.id = '5001' -); - -sleep 2s diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 0ac47fc7fbd1b..2c53e57748163 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -49,9 +49,11 @@ VALUES (1,1,'no'), (3,3,'no'), (4,4,'no'); +-- This user is for non-shared CDC CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; +-- This user is for shared CDC CREATE USER 'rwcdc'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'rwcdc'@'%'; diff --git a/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt b/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt deleted file mode 100644 index 59f7a4538ef2c..0000000000000 --- a/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt +++ /dev/null @@ -1,133 +0,0 @@ -# create and drop CDC mysql tables concurrently - -control substitution on - -statement ok -ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; - -system ok -mysql --protocol=tcp -u root -e " - DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; - USE testdb1; - CREATE TABLE tt1 (v1 int primary key, v2 timestamp); - INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); - CREATE TABLE tt2 (v1 int primary key, v2 timestamp); - INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); - CREATE TABLE tt3 (v1 int primary key, v2 timestamp); - INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); - CREATE TABLE tt4 (v1 int primary key, v2 timestamp); - INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); - CREATE TABLE tt5 (v1 int primary key, v2 timestamp); - INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00');" - -statement ok -create table tt1 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt1', -); - -statement ok -create table tt2 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt2', -); - -statement ok -create table tt3 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt3', -); - -statement ok -create table tt4 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt4', -); - -statement ok -create table tt5 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt5', -); - -sleep 5s - -query IT -select * from tt1; ----- -1 2023-10-23 10:00:00+00:00 - -query IT -select * from tt2; ----- -2 2023-10-23 11:00:00+00:00 - -query IT -select * from tt3; ----- -3 2023-10-23 12:00:00+00:00 - -query IT -select * from tt4; ----- -4 2023-10-23 13:00:00+00:00 - -query IT -select * from tt5; ----- -5 2023-10-23 14:00:00+00:00 - -statement ok -drop table tt1; - -statement ok -drop table tt2; - -statement ok -drop table tt3; - -statement ok -drop table tt4; - -statement ok -drop table tt5; diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md index 3a9070639b8cb..d1a84e22099e5 100644 --- a/e2e_test/source_inline/README.md +++ b/e2e_test/source_inline/README.md @@ -26,5 +26,5 @@ risedev slt 'e2e_test/source_inline/**/*.slt' To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. Use `system` command to setup instead. -For simple cases, you can directly write a bash command; -For more complex cases, you can write a test script (with any language like bash, python, zx), and invoke it in the `system` command. +- For simple cases, you can directly write a bash command; +- For more complex cases, you can write a test script. See also [e2e_test/commands/README.md](../commands/README.md) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt new file mode 100644 index 0000000000000..3ca952cc377f2 --- /dev/null +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -0,0 +1,218 @@ +# create and drop CDC mysql tables concurrently + +control substitution on + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; + +system ok +mysql -e " + DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; + USE testdb1; + CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + CREATE TABLE tt2 (v1 int primary key, v2 timestamp); + CREATE TABLE tt3 (v1 int primary key, v2 timestamp); + CREATE TABLE tt4 (v1 int primary key, v2 timestamp); + CREATE TABLE tt5 (v1 int primary key, v2 timestamp); +" + +system ok +mysql -e " + DROP USER IF EXISTS 'non-shared-cdc'@'%'; + CREATE USER 'non-shared-cdc'@'%' IDENTIFIED BY '123456'; + GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'non-shared-cdc'@'%'; + # + DROP USER IF EXISTS 'shared-cdc'@'%'; + CREATE USER 'shared-cdc'@'%' IDENTIFIED BY 'abcdef'; + GRANT SELECT, RELOAD, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'shared-cdc'@'%'; + # + FLUSH PRIVILEGES; +" + +statement ok +create source s with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + server.id = '114514' +); + +sleep 1s + +# There's no data in upstream yet, but the offset is updated in the source executor via the heartbeat message. +system ok +internal_table.mjs --name s --type '' --count +---- +count: 1 + + +system ok +mysql -e " + INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); + INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); + INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); + INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); + INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00'); +" + +statement ok +create table tt1 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt1', +); + +statement ok +create table tt2 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt2', +); + +statement ok +create table tt3 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt3', +); + +statement ok +create table tt4 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt4', +); + +statement ok +create table tt5 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt5', +); + +statement ok +create table tt1_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt1'; + +statement ok +create table tt2_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt2'; + +statement ok +create table tt3_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt3'; + +statement ok +create table tt4_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt4'; + +statement ok +create table tt5_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt5'; + +sleep 5s + +query IT +select * from tt1; +---- +1 2023-10-23 10:00:00+00:00 + +query IT +select * from tt2; +---- +2 2023-10-23 11:00:00+00:00 + +query IT +select * from tt3; +---- +3 2023-10-23 12:00:00+00:00 + +query IT +select * from tt4; +---- +4 2023-10-23 13:00:00+00:00 + +query IT +select * from tt5; +---- +5 2023-10-23 14:00:00+00:00 + + +query IT +select * from tt1_shared; +---- +1 2023-10-23 10:00:00+00:00 + +query IT +select * from tt2_shared; +---- +2 2023-10-23 11:00:00+00:00 + +query IT +select * from tt3_shared; +---- +3 2023-10-23 12:00:00+00:00 + +query IT +select * from tt4_shared; +---- +4 2023-10-23 13:00:00+00:00 + +query IT +select * from tt5_shared; +---- +5 2023-10-23 14:00:00+00:00 + +statement ok +drop table tt1; + +statement ok +drop table tt2; + +statement ok +drop table tt3; + +statement ok +drop table tt4; + +statement ok +drop table tt5; + +statement ok +drop source s cascade; diff --git a/risedev.yml b/risedev.yml index 721043ccf7fa2..bfef64e6b598c 100644 --- a/risedev.yml +++ b/risedev.yml @@ -823,6 +823,30 @@ profile: address: message_queue port: 29092 + ci-inline-source-test: + config-path: src/config/ci.toml + steps: + - use: minio + - use: etcd + unsafe-no-fsync: true + - use: meta-node + - use: compute-node + enable-tiered-cache: true + - use: frontend + - use: compactor + - use: pubsub + persist-data: true + - use: kafka + user-managed: true + address: message_queue + port: 29092 + - use: mysql + port: 3306 + address: mysql + user: root + password: 123456 + user-managed: true + ci-redis: config-path: src/config/ci.toml steps: @@ -1406,3 +1430,21 @@ template: # address of redis address: "127.0.0.1" + + # MySQL service. Currently user-managed only + mysql: + # Id to be picked-up by services + id: mysql + + # listen port of mysql + port: 8306 + # address of mysql + address: "127.0.0.1" + + # Note: these configs are not validated by risedev. They are passed as-is to risedev-env + # default user for mysql operations. + # This is not used in RISEDEV_MYSQL_WITH_OPTIONS_COMMON. + user: root + password: "" + + user-managed: true diff --git a/src/risedevtool/src/bin/risedev-compose.rs b/src/risedevtool/src/bin/risedev-compose.rs index 0f3cb5b020fca..c3a7d079e4c4e 100644 --- a/src/risedevtool/src/bin/risedev-compose.rs +++ b/src/risedevtool/src/bin/risedev-compose.rs @@ -222,7 +222,9 @@ fn main() -> Result<()> { volumes.insert(c.id.clone(), ComposeVolume::default()); (c.address.clone(), c.compose(&compose_config)?) } - ServiceConfig::Redis(_) => return Err(anyhow!("not supported")), + ServiceConfig::Redis(_) | ServiceConfig::MySql(_) => { + return Err(anyhow!("not supported")) + } }; compose.container_name = service.id().to_string(); if opts.deploy { diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index d5a14cbf8c1bd..47ef0e20341e3 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -24,11 +24,10 @@ use fs_err::OpenOptions; use indicatif::ProgressBar; use risedev::util::{complete_spin, fail_spin}; use risedev::{ - generate_risedev_env, preflight_check, AwsS3Config, CompactorService, ComputeNodeService, - ConfigExpander, ConfigureTmuxTask, EnsureStopService, ExecuteContext, FrontendService, - GrafanaService, KafkaService, MetaNodeService, MinioService, OpendalConfig, PrometheusService, - PubsubService, RedisService, ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService, - RISEDEV_NAME, + generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander, + ConfigureTmuxTask, EnsureStopService, ExecuteContext, FrontendService, GrafanaService, + KafkaService, MetaNodeService, MinioService, PrometheusService, PubsubService, RedisService, + ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService, RISEDEV_NAME, }; use tempfile::tempdir; use thiserror_ext::AsReport; @@ -63,6 +62,33 @@ impl ProgressManager { } } +struct DummyService { + id: String, +} + +impl DummyService { + pub fn new(id: &str) -> Self { + Self { id: id.to_string() } + } +} + +impl Task for DummyService { + fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { + ctx.service(self); + writeln!( + &mut ctx.log, + "{} is a dummy service. Please ensure it's correctly configured on your own! 🙏", + self.id + )?; + ctx.complete_spin(); + Ok(()) + } + + fn id(&self) -> String { + self.id.clone() + } +} + fn task_main( manager: &mut ProgressManager, services: &Vec, @@ -136,7 +162,7 @@ fn task_main( // TODO(chi): etcd will set its health check to success only after all nodes are // connected and there's a leader, therefore we cannot do health check for now. let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; } ServiceConfig::Sqlite(c) => { @@ -173,7 +199,7 @@ fn task_main( let mut service = PrometheusService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api http://{}:{}/", c.address, c.port)); @@ -185,7 +211,7 @@ fn task_main( service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api grpc://{}:{}/", c.address, c.port)); @@ -196,7 +222,7 @@ fn task_main( let mut service = MetaNodeService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb.set_message(format!( "api grpc://{}:{}/, dashboard http://{}:{}/", @@ -209,7 +235,7 @@ fn task_main( let mut service = FrontendService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api postgres://{}:{}/", c.address, c.port)); @@ -231,7 +257,7 @@ fn task_main( let mut service = CompactorService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("compactor {}:{}", c.address, c.port)); @@ -242,7 +268,7 @@ fn task_main( let mut service = GrafanaService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("dashboard http://{}:{}/", c.address, c.port)); @@ -253,7 +279,7 @@ fn task_main( let mut service = TempoService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.listen_address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.listen_address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api http://{}:{}/", c.listen_address, c.port)); @@ -261,46 +287,14 @@ fn task_main( ServiceConfig::AwsS3(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - - struct AwsService(AwsS3Config); - impl Task for AwsService { - fn execute( - &mut self, - _ctx: &mut ExecuteContext, - ) -> anyhow::Result<()> { - Ok(()) - } - - fn id(&self) -> String { - self.0.id.clone() - } - } - - ctx.service(&AwsService(c.clone())); - ctx.complete_spin(); + DummyService::new(&c.id).execute(&mut ctx)?; ctx.pb .set_message(format!("using AWS s3 bucket {}", c.bucket)); } ServiceConfig::Opendal(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - - struct OpendalService(OpendalConfig); - impl Task for OpendalService { - fn execute( - &mut self, - _ctx: &mut ExecuteContext, - ) -> anyhow::Result<()> { - Ok(()) - } - - fn id(&self) -> String { - self.0.id.clone() - } - } - - ctx.service(&OpendalService(c.clone())); - ctx.complete_spin(); + DummyService::new(&c.id).execute(&mut ctx)?; ctx.pb .set_message(format!("using Opendal, namenode = {}", c.namenode)); } @@ -310,7 +304,7 @@ fn task_main( let mut service = ZooKeeperService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("zookeeper {}:{}", c.address, c.port)); @@ -348,6 +342,17 @@ fn task_main( ctx.pb .set_message(format!("redis {}:{}", c.address, c.port)); } + ServiceConfig::MySql(c) => { + let mut ctx = + ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); + // TODO: support starting mysql in RiseDev. Currently it's user-managed only + DummyService::new(&c.id).execute(&mut ctx)?; + let mut task = + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("mysql {}:{}/", c.address, c.port)); + } } let service_id = service.id().to_string(); diff --git a/src/risedevtool/src/config.rs b/src/risedevtool/src/config.rs index d5fe2d8c220a5..45d90daa0c872 100644 --- a/src/risedevtool/src/config.rs +++ b/src/risedevtool/src/config.rs @@ -174,6 +174,7 @@ impl ConfigExpander { "redis" => ServiceConfig::Redis(serde_yaml::from_str(&out_str)?), "zookeeper" => ServiceConfig::ZooKeeper(serde_yaml::from_str(&out_str)?), "redpanda" => ServiceConfig::RedPanda(serde_yaml::from_str(&out_str)?), + "mysql" => ServiceConfig::MySql(serde_yaml::from_str(&out_str)?), other => return Err(anyhow!("unsupported use type: {}", other)), }; Ok(result) diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 1efdf1470998d..e2707c320f93f 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -77,6 +77,13 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap(); writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); } + ServiceConfig::MySql(c) => { + let host = &c.address; + let port = &c.port; + writeln!(env, r#"MYSQL_HOST="{host}""#,).unwrap(); + writeln!(env, r#"MYSQL_TCP_PORT="{port}""#,).unwrap(); + writeln!(env, r#"RISEDEV_MYSQL_WITH_OPTIONS_COMMON="connector='mysql-cdc',hostname='{host}',port='{port}'""#,).unwrap(); + } _ => {} } } diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index e5f149b8d10cd..19dffa388cdff 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -336,6 +336,23 @@ pub struct RedisConfig { pub address: String, } +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +#[serde(deny_unknown_fields)] +pub struct MySqlConfig { + #[serde(rename = "use")] + phantom_use: Option, + pub id: String, + + pub port: u16, + pub address: String, + + pub user: String, + pub password: String, + + pub user_managed: bool, +} + /// All service configuration #[derive(Clone, Debug, PartialEq)] pub enum ServiceConfig { @@ -356,6 +373,7 @@ pub enum ServiceConfig { Redis(RedisConfig), ZooKeeper(ZooKeeperConfig), RedPanda(RedPandaConfig), + MySql(MySqlConfig), } impl ServiceConfig { @@ -378,6 +396,7 @@ impl ServiceConfig { Self::Redis(c) => &c.id, Self::RedPanda(c) => &c.id, Self::Opendal(c) => &c.id, + Self::MySql(c) => &c.id, } } @@ -400,6 +419,7 @@ impl ServiceConfig { Self::Redis(c) => Some(c.port), Self::RedPanda(_c) => None, Self::Opendal(_) => None, + Self::MySql(c) => Some(c.port), } } @@ -422,6 +442,7 @@ impl ServiceConfig { Self::Redis(_c) => false, Self::RedPanda(_c) => false, Self::Opendal(_c) => false, + Self::MySql(c) => c.user_managed, } } } diff --git a/src/risedevtool/src/task/task_configure_grpc_node.rs b/src/risedevtool/src/task/task_configure_grpc_node.rs index 75a69a7cdb29f..5a556a5fca429 100644 --- a/src/risedevtool/src/task/task_configure_grpc_node.rs +++ b/src/risedevtool/src/task/task_configure_grpc_node.rs @@ -16,13 +16,13 @@ use anyhow::Result; use super::{ExecuteContext, Task}; -pub struct ConfigureGrpcNodeTask { +pub struct ConfigureTcpNodeTask { advertise_address: String, port: u16, user_managed: bool, } -impl ConfigureGrpcNodeTask { +impl ConfigureTcpNodeTask { pub fn new(advertise_address: String, port: u16, user_managed: bool) -> Result { Ok(Self { advertise_address, @@ -32,8 +32,12 @@ impl ConfigureGrpcNodeTask { } } -impl Task for ConfigureGrpcNodeTask { +impl Task for ConfigureTcpNodeTask { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { + assert!( + ctx.id.is_some(), + "Service should be set before executing ConfigureTcpNodeTask" + ); let address = format!("{}:{}", self.advertise_address, self.port); if self.user_managed { From 61c6f5031128b313a9e7690f0fdd8cd842feb6a8 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 23 Apr 2024 23:08:21 +0800 Subject: [PATCH 02/12] fix addr Signed-off-by: xxchan --- src/risedevtool/src/task.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index bec0df5ae812e..e6b9f79244976 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -36,7 +36,7 @@ mod utils; mod zookeeper_service; use std::env; -use std::net::TcpStream; +use std::net::{TcpStream, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::process::{Command, Output}; use std::sync::Arc; @@ -255,7 +255,11 @@ where /// Wait for a user-managed service to be available pub fn wait_tcp_user(&mut self, server: impl AsRef) -> anyhow::Result<()> { - let addr = server.as_ref().parse()?; + let addr = server + .as_ref() + .to_socket_addrs()? + .next() + .expect(format!("failed to resolve {}", server.as_ref()).as_str()); wait( || { TcpStream::connect_timeout(&addr, Duration::from_secs(1))?; From 153e6f00b7a0cd834ce5b1d172fc612fe1f564cd Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 23 Apr 2024 23:17:20 +0800 Subject: [PATCH 03/12] minor Signed-off-by: xxchan --- e2e_test/source_inline/commands.toml | 12 ++++++++++-- src/risedevtool/src/task.rs | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 8af865099ac7c..f616fb40711c9 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -39,8 +39,16 @@ fi category = "RiseDev - Test - Source Test - Kafka" description = "Delete all kafka topics." dependencies = ["kafka-hook"] -command = "rpk" -args = ["topic", "delete", "-r", "*"] +script = ''' +#!/usr/bin/env sh +set -e +if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "Deleting all Kafka topics..." + rpk topic delete -r "*" +else + echo "No Kafka to clean." +fi +''' [tasks.kafka-topics] category = "RiseDev - Test - Source Test - Kafka" diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index e6b9f79244976..cb21d7809597a 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -259,7 +259,7 @@ where .as_ref() .to_socket_addrs()? .next() - .expect(format!("failed to resolve {}", server.as_ref()).as_str()); + .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref())); wait( || { TcpStream::connect_timeout(&addr, Duration::from_secs(1))?; From 8f10719eca56b736c0f49e4b79fe9082721e8363 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 23 Apr 2024 23:30:24 +0800 Subject: [PATCH 04/12] fix Signed-off-by: xxchan --- ci/scripts/e2e-source-test.sh | 12 ++++++------ .../source_inline/cdc/mysql/mysql_create_drop.slt | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 62dc2842879d2..5b776d91bd35d 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -31,6 +31,12 @@ buildkite-agent artifact download risingwave-connector.tar.gz ./ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node +echo "--- e2e, inline test" +risedev ci-start ci-inline-source-test +risedev slt './e2e_test/source_inline/**/*.slt' +echo "--- Kill cluster" +risedev ci-kill + echo "--- Prepare data" cp src/connector/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc cp src/connector/src/test_data/complex-schema.avsc ./avro-complex-schema.avsc @@ -162,9 +168,3 @@ risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' - -echo "--- Kill cluster" -risedev ci-kill -echo "--- e2e, inline test" -risedev ci-start ci-inline-source-test -risedev slt './e2e_test/source_inline/**/*.slt' diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index 3ca952cc377f2..3dd9b977c4a7d 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -54,7 +54,7 @@ mysql -e " INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00'); -" +" testdb1 statement ok create table tt1 (v1 int, From 495f55cf2679202aafb1e02d27b547b77456aa1e Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 23 Apr 2024 23:46:10 +0800 Subject: [PATCH 05/12] fix Signed-off-by: xxchan --- src/risedevtool/src/risedev_env.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index e2707c320f93f..24f18895434e9 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -80,8 +80,15 @@ pub fn generate_risedev_env(services: &Vec) -> String { ServiceConfig::MySql(c) => { let host = &c.address; let port = &c.port; + let user = &c.user; + let password = &c.password; + // These envs are used by `mysql` cli. writeln!(env, r#"MYSQL_HOST="{host}""#,).unwrap(); writeln!(env, r#"MYSQL_TCP_PORT="{port}""#,).unwrap(); + writeln!(env, r#"MYSQL_USER="{user}""#,).unwrap(); + writeln!(env, r#"MYSQL_PWD="{password}""#,).unwrap(); + // Note: user and password are not included in the common WITH options. + // It's expected to create another dedicated user for the source. writeln!(env, r#"RISEDEV_MYSQL_WITH_OPTIONS_COMMON="connector='mysql-cdc',hostname='{host}',port='{port}'""#,).unwrap(); } _ => {} From 4adab8ab53c6c50e8bdd9e79724deed16e85aa1d Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 24 Apr 2024 12:04:27 +0800 Subject: [PATCH 06/12] Update e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt --- e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index 3dd9b977c4a7d..ae76e6918ae40 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -32,7 +32,7 @@ mysql -e " statement ok create source s with ( ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, - username = 'non-shared-cdc', + username = 'shared-cdc', password = '123456', database.name = 'testdb1', server.id = '114514' From 05cdeda1db7d42923b40e579b0683fcf1bb946cd Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 24 Apr 2024 22:39:50 +0800 Subject: [PATCH 07/12] fix Signed-off-by: xxchan --- e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index ae76e6918ae40..6691ec758f5c0 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -33,7 +33,7 @@ statement ok create source s with ( ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, username = 'shared-cdc', - password = '123456', + password = 'abcdef', database.name = 'testdb1', server.id = '114514' ); From 35ab20d48ddb5b55a25a15839952d6ae63ceeef0 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 24 Apr 2024 22:43:51 +0800 Subject: [PATCH 08/12] resolve comments Signed-off-by: xxchan --- src/risedevtool/src/bin/risedev-dev.rs | 38 ++++---------------- src/risedevtool/src/task.rs | 2 ++ src/risedevtool/src/task/dummy_service.rs | 42 +++++++++++++++++++++++ 3 files changed, 51 insertions(+), 31 deletions(-) create mode 100644 src/risedevtool/src/task/dummy_service.rs diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 47ef0e20341e3..7c741407dd169 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -18,16 +18,16 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use console::style; use fs_err::OpenOptions; use indicatif::ProgressBar; use risedev::util::{complete_spin, fail_spin}; use risedev::{ generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander, - ConfigureTmuxTask, EnsureStopService, ExecuteContext, FrontendService, GrafanaService, - KafkaService, MetaNodeService, MinioService, PrometheusService, PubsubService, RedisService, - ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService, RISEDEV_NAME, + ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService, + GrafanaService, KafkaService, MetaNodeService, MinioService, PrometheusService, PubsubService, + RedisService, ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService, RISEDEV_NAME, }; use tempfile::tempdir; use thiserror_ext::AsReport; @@ -62,33 +62,6 @@ impl ProgressManager { } } -struct DummyService { - id: String, -} - -impl DummyService { - pub fn new(id: &str) -> Self { - Self { id: id.to_string() } - } -} - -impl Task for DummyService { - fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { - ctx.service(self); - writeln!( - &mut ctx.log, - "{} is a dummy service. Please ensure it's correctly configured on your own! 🙏", - self.id - )?; - ctx.complete_spin(); - Ok(()) - } - - fn id(&self) -> String { - self.id.clone() - } -} - fn task_main( manager: &mut ProgressManager, services: &Vec, @@ -346,6 +319,9 @@ fn task_main( let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); // TODO: support starting mysql in RiseDev. Currently it's user-managed only + if !c.user_managed { + bail!("Non user-managed MySQL is not supported yet. Please use user-managed MySQL."); + } DummyService::new(&c.id).execute(&mut ctx)?; let mut task = risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index cb21d7809597a..84b2a1b9428e7 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -15,6 +15,7 @@ mod compactor_service; mod compute_node_service; mod configure_tmux_service; +mod dummy_service; mod ensure_stop_service; mod etcd_service; mod frontend_service; @@ -51,6 +52,7 @@ pub use utils::*; pub use self::compactor_service::*; pub use self::compute_node_service::*; pub use self::configure_tmux_service::*; +pub use self::dummy_service::DummyService; pub use self::ensure_stop_service::*; pub use self::etcd_service::*; pub use self::frontend_service::*; diff --git a/src/risedevtool/src/task/dummy_service.rs b/src/risedevtool/src/task/dummy_service.rs new file mode 100644 index 0000000000000..f3d1054b1ca23 --- /dev/null +++ b/src/risedevtool/src/task/dummy_service.rs @@ -0,0 +1,42 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ExecuteContext, Task}; + +pub struct DummyService { + id: String, +} + +impl DummyService { + pub fn new(id: &str) -> Self { + Self { id: id.to_string() } + } +} + +impl Task for DummyService { + fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { + ctx.service(self); + writeln!( + &mut ctx.log, + "{} is a dummy service. Please ensure it's correctly configured on your own! 🙏", + self.id + )?; + ctx.complete_spin(); + Ok(()) + } + + fn id(&self) -> String { + self.id.clone() + } +} From 6352897d40803cd0baccd5daaa89af753a1b5d29 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 25 Apr 2024 17:36:51 +0800 Subject: [PATCH 09/12] try Signed-off-by: xxchan --- .../source_inline/cdc/mysql/mysql_create_drop.slt | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index 6691ec758f5c0..e2119fc3f5ee6 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -10,10 +10,15 @@ mysql -e " DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; USE testdb1; CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); CREATE TABLE tt2 (v1 int primary key, v2 timestamp); + INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); CREATE TABLE tt3 (v1 int primary key, v2 timestamp); + INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); CREATE TABLE tt4 (v1 int primary key, v2 timestamp); + INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); CREATE TABLE tt5 (v1 int primary key, v2 timestamp); + INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00'); " system ok @@ -47,15 +52,6 @@ internal_table.mjs --name s --type '' --count count: 1 -system ok -mysql -e " - INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); - INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); - INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); - INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); - INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00'); -" testdb1 - statement ok create table tt1 (v1 int, v2 timestamptz, From 0b01b08465abd1b3a8c582d4f9819461122f1b7d Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 25 Apr 2024 19:16:15 +0800 Subject: [PATCH 10/12] update comment Signed-off-by: xxchan --- e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index e2119fc3f5ee6..fcd9081632c35 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -43,9 +43,9 @@ create source s with ( server.id = '114514' ); -sleep 1s - -# There's no data in upstream yet, but the offset is updated in the source executor via the heartbeat message. +# SourceExecutor only receives new data after it's created. +# But it can receive offset update at the beginning and periodically +# via the heartbeat message. system ok internal_table.mjs --name s --type '' --count ---- From de76190aef1186ba29b203223ee93641be8571a4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 25 Apr 2024 20:09:03 +0800 Subject: [PATCH 11/12] add a sleep Signed-off-by: xxchan --- e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index fcd9081632c35..3469137113dae 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -43,6 +43,8 @@ create source s with ( server.id = '114514' ); +sleep 1s + # SourceExecutor only receives new data after it's created. # But it can receive offset update at the beginning and periodically # via the heartbeat message. From 11376303079fbe0abf322afe64b4b5536a4d00f4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 25 Apr 2024 20:32:10 +0800 Subject: [PATCH 12/12] Update e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt --- e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index 3469137113dae..e6ce86aaf06fa 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -43,7 +43,7 @@ create source s with ( server.id = '114514' ); -sleep 1s +sleep 2s # SourceExecutor only receives new data after it's created. # But it can receive offset update at the beginning and periodically