Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit ea28be3cc7c6186f93a86a3fb8bb3c8123074f8c
Author: xxchan <[email protected]>
Date:   Thu Apr 25 21:08:32 2024 +0800

    fix

    Signed-off-by: xxchan <[email protected]>

commit 0bcc42a7ca96bdb6de9f3ee2308ee86cc1dfcd61
Author: xxchan <[email protected]>
Date:   Thu Apr 25 20:17:40 2024 +0800

    fix

    Signed-off-by: xxchan <[email protected]>

commit 5de9bdce1ec5f1773107f550cd4b4667877721f2
Author: xxchan <[email protected]>
Date:   Thu Apr 25 17:04:56 2024 +0800

    resolve comment

    Signed-off-by: xxchan <[email protected]>

commit b0f5dff1db795ff852090a572e73910874b89d1d
Author: xxchan <[email protected]>
Date:   Wed Apr 24 08:06:41 2024 +0800

    update

commit f38cb387d6254ff40aac0d02d9fcbaa212ce7b32
Author: xxchan <[email protected]>
Date:   Wed Apr 24 08:04:04 2024 +0800

    update cdc test

    Signed-off-by: xxchan <[email protected]>

commit d5afeb8756a4c029e441b7983d069dcc8767c29e
Author: xxchan <[email protected]>
Date:   Mon Apr 22 15:33:11 2024 +0800

    fix: only resume for shared

    Signed-off-by: xxchan <[email protected]>

commit 21163412cb1201dc2190c3b0e285e88f8e443f17
Author: xxchan <[email protected]>
Date:   Mon Apr 22 15:17:33 2024 +0800

    clippy

    Signed-off-by: xxchan <[email protected]>

commit 0d0a304623fff4732126b61f9a977bcb18e56dd9
Author: xxchan <[email protected]>
Date:   Mon Apr 22 15:00:02 2024 +0800

    update tests

    Signed-off-by: xxchan <[email protected]>

commit a0e9670e6002602fd7c508e6f79b6a1f57d291b9
Author: xxchan <[email protected]>
Date:   Sat Apr 20 17:48:51 2024 +0800

    fix

    Signed-off-by: xxchan <[email protected]>

commit 572eab4b35a1d4c0a2352deabd9fa307843cd2c5
Author: xxchan <[email protected]>
Date:   Sat Apr 20 17:10:58 2024 +0800

    clean

    Signed-off-by: xxchan <[email protected]>

commit eee0f6c78822b3d937725bd2a76af23b3b15909a
Author: xxchan <[email protected]>
Date:   Sat Apr 20 17:08:32 2024 +0800

    cleanup state table changes

    Signed-off-by: xxchan <[email protected]>

commit ad90536448a2bd16b95b0dab629a428592a19f7d
Author: xxchan <[email protected]>
Date:   Sat Apr 20 17:01:21 2024 +0800

    refactor

    Signed-off-by: xxchan <[email protected]>

commit 017e90ffc73993a5f88e3a02b4e78156a6b92b9a
Author: xxchan <[email protected]>
Date:   Fri Apr 19 16:33:05 2024 +0800

    add doc

    Signed-off-by: xxchan <[email protected]>

commit b893b915997481fe61c9c6963b75b4769e5f238c
Author: xxchan <[email protected]>
Date:   Wed Apr 17 02:18:00 2024 +0800

    feat: pause shared source until a MV is created

    Signed-off-by: xxchan <[email protected]>

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Apr 25, 2024
1 parent 1137630 commit 0d49ccb
Show file tree
Hide file tree
Showing 17 changed files with 334 additions and 137 deletions.
22 changes: 12 additions & 10 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ rm -rf "${PREFIX_PROFILING}"
[tasks.reset-rw]
category = "RiseDev - Start/Stop"
description = "Clean all data in the default database dev of the running RisingWave"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c "CREATE DATABASE risedev_tmp;"
Expand Down Expand Up @@ -568,11 +567,16 @@ if [ ! -f "${RC_ENV_FILE}" ]; then
fi
'''

[tasks.check-and-load-risedev-env-file]
private = true
category = "RiseDev - Prepare"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]

[tasks.psql-env]
category = "RiseDev - Start/Stop"
description = "Dump env configuration for psql"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
cat <<EOF > "${PREFIX_CONFIG}/psql-env"
Expand All @@ -590,8 +594,7 @@ echo " $(tput setaf 4)source ${PREFIX_CONFIG}/psql-env$(tput sgr0)"
[tasks.psql]
category = "RiseDev - Start/Stop"
description = "Run local psql client with default connection parameters. You can pass extra arguments to psql."
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@"
Expand All @@ -600,8 +603,7 @@ psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root
[tasks.ctl]
category = "RiseDev - Start/Stop"
description = "Start RiseCtl"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
cargo run -p risingwave_cmd_all --profile "${RISINGWAVE_BUILD_PROFILE}" -- ctl "$@"
Expand Down Expand Up @@ -1312,8 +1314,7 @@ category = "RiseDev - Test - SQLLogicTest"
install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [
"--help",
], install_command = "binstall" }
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
command = "sqllogictest"
args = ["${@}"]
description = "🌟 Run SQLLogicTest"
Expand Down Expand Up @@ -1452,6 +1453,7 @@ dependencies = ["check-risedev-env-file"]
script = '''
#!/usr/bin/env bash
set -euo pipefail
cat ${PREFIX_CONFIG}/risedev-env
echo "Hint: To load the environment variables into the shell, you may run:"
echo "$(tput setaf 4)\set -a; source ${PREFIX_CONFIG}/risedev-env; set +a$(tput sgr0)"
Expand Down
58 changes: 0 additions & 58 deletions e2e_test/source/basic/kafka_shared_source.slt

This file was deleted.

21 changes: 14 additions & 7 deletions e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,20 @@ create source s with (

sleep 2s

# SourceExecutor only receives new data after it's created.
# At the beginning, the source is paused. It will resume after a downstream is created.
system ok
internal_table.mjs --name s --type '' --count
----
count: 0

statement ok
create table tt1_shared (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) from s table 'testdb1.tt1';

# The source is resumed.
# SourceExecutor does not handle historical data, and only receives new data after it's created.
# But it can receive offset update at the beginning and periodically
# via the heartbeat message.
system ok
Expand Down Expand Up @@ -114,12 +127,6 @@ create table tt5 (v1 int,
table.name = 'tt5',
);

statement ok
create table tt1_shared (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) from s table 'testdb1.tt1';

statement ok
create table tt2_shared (v1 int,
v2 timestamptz,
Expand Down
23 changes: 10 additions & 13 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# This file contains commands used by the tests.

[tasks.source-test-hook]
private = true
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]

# Note about the Kafka CLI tooling:
# - Built-in Kafka console tools:
Expand All @@ -16,10 +12,10 @@ env_files = ["${PREFIX_CONFIG}/risedev-env"]
# - rpk:
# Golang based.
# Style example: RPK_BROKERS=localhost:9092 rpk topic create t
[tasks.kafka-hook]
[tasks.check-kafka]
private = true
description = "Check if Kafka is started by RiseDev"
dependencies = ["source-test-hook"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env sh
set -e
Expand All @@ -38,10 +34,11 @@ fi
[tasks.clean-kafka]
category = "RiseDev - Test - Source Test - Kafka"
description = "Delete all kafka topics."
dependencies = ["kafka-hook"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env sh
set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
Expand All @@ -52,7 +49,7 @@ fi

[tasks.kafka-topics]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -61,7 +58,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTS

[tasks.kafka-produce]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -70,7 +67,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_K

[tasks.kafka-consume]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -79,7 +76,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_K

[tasks.kafka-consumer-groups]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -89,7 +86,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KA
# rpk tools
[tasks.rpk]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
# check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options
script = """
#!/usr/bin/env sh
Expand All @@ -106,7 +103,7 @@ rpk "$@"
[tasks.redpanda-console]
category = "RiseDev - Test - Source Test - Kafka"
description = "Start Redpanda console (Kafka GUI) at localhost:8080."
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = '''
#!/usr/bin/env sh
set -e
Expand Down
13 changes: 5 additions & 8 deletions e2e_test/source_inline/kafka/consumer_group.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
// zx: A tool for writing better scripts
// https://google.github.io/zx/

const {
mv: mv,
_: _command,
} = minimist(process.argv.slice(3), {
string: ["mv", "topic"],
const { mv: mv, _: _command } = minimist(process.argv.slice(3), {
string: ["mv"],
_: ["list-members", "list-lags"],
});
const command = _command[0];
Expand Down Expand Up @@ -62,7 +59,7 @@ async function list_consumer_group_members(fragment_id) {
const groups = await list_consumer_groups(fragment_id);
return Promise.all(
groups.map(async (group_name) => {
return (await describe_consumer_group(group_name))["MEMBERS"]
return (await describe_consumer_group(group_name))["MEMBERS"];
})
);
}
Expand All @@ -71,14 +68,14 @@ async function list_consumer_group_lags(fragment_id) {
const groups = await list_consumer_groups(fragment_id);
return Promise.all(
groups.map(async (group_name) => {
return (await describe_consumer_group(group_name))["TOTAL-LAG"]
return (await describe_consumer_group(group_name))["TOTAL-LAG"];
})
);
}

const fragment_id = await get_fragment_id_of_mv(mv);
if (command == "list-groups") {
echo`${(await list_consumer_groups(fragment_id))}`;
echo`${await list_consumer_groups(fragment_id)}`;
} else if (command == "list-members") {
echo`${await list_consumer_group_members(fragment_id)}`;
} else if (command == "list-lags") {
Expand Down
Loading

0 comments on commit 0d49ccb

Please sign in to comment.