Skip to content

Commit

Permalink
feat: pause shared SourceExecutor until a downstream actor is created (
Browse files Browse the repository at this point in the history
…#16348)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Apr 25, 2024
1 parent d5be17d commit af9ac6d
Show file tree
Hide file tree
Showing 18 changed files with 347 additions and 137 deletions.
22 changes: 12 additions & 10 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ rm -rf "${PREFIX_PROFILING}"
[tasks.reset-rw]
category = "RiseDev - Start/Stop"
description = "Clean all data in the default database dev of the running RisingWave"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c "CREATE DATABASE risedev_tmp;"
Expand Down Expand Up @@ -568,11 +567,16 @@ if [ ! -f "${RC_ENV_FILE}" ]; then
fi
'''

[tasks.check-and-load-risedev-env-file]
private = true
category = "RiseDev - Prepare"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]

[tasks.psql-env]
category = "RiseDev - Start/Stop"
description = "Dump env configuration for psql"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
cat <<EOF > "${PREFIX_CONFIG}/psql-env"
Expand All @@ -590,8 +594,7 @@ echo " $(tput setaf 4)source ${PREFIX_CONFIG}/psql-env$(tput sgr0)"
[tasks.psql]
category = "RiseDev - Start/Stop"
description = "Run local psql client with default connection parameters. You can pass extra arguments to psql."
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@"
Expand All @@ -600,8 +603,7 @@ psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root
[tasks.ctl]
category = "RiseDev - Start/Stop"
description = "Start RiseCtl"
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
cargo run -p risingwave_cmd_all --profile "${RISINGWAVE_BUILD_PROFILE}" -- ctl "$@"
Expand Down Expand Up @@ -1312,8 +1314,7 @@ category = "RiseDev - Test - SQLLogicTest"
install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [
"--help",
], install_command = "binstall" }
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
dependencies = ["check-and-load-risedev-env-file"]
command = "sqllogictest"
args = ["${@}"]
description = "🌟 Run SQLLogicTest"
Expand Down Expand Up @@ -1452,6 +1453,7 @@ dependencies = ["check-risedev-env-file"]
script = '''
#!/usr/bin/env bash
set -euo pipefail
cat ${PREFIX_CONFIG}/risedev-env
echo "Hint: To load the environment variables into the shell, you may run:"
echo "$(tput setaf 4)\set -a; source ${PREFIX_CONFIG}/risedev-env; set +a$(tput sgr0)"
Expand Down
58 changes: 0 additions & 58 deletions e2e_test/source/basic/kafka_shared_source.slt

This file was deleted.

24 changes: 17 additions & 7 deletions e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,23 @@ create source s with (

sleep 2s

# SourceExecutor only receives new data after it's created.
# At the beginning, the source is paused. It will resume after a downstream is created.
system ok
internal_table.mjs --name s --type '' --count
----
count: 0


statement ok
create table tt1_shared (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) from s table 'testdb1.tt1';

sleep 2s

# The source is resumed.
# SourceExecutor does not handle historical data, and only receives new data after it's created.
# But it can receive offset update at the beginning and periodically
# via the heartbeat message.
system ok
Expand Down Expand Up @@ -114,12 +130,6 @@ create table tt5 (v1 int,
table.name = 'tt5',
);

statement ok
create table tt1_shared (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) from s table 'testdb1.tt1';

statement ok
create table tt2_shared (v1 int,
v2 timestamptz,
Expand Down
23 changes: 10 additions & 13 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# This file contains commands used by the tests.

[tasks.source-test-hook]
private = true
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]

# Note about the Kafka CLI tooling:
# - Built-in Kafka console tools:
Expand All @@ -16,10 +12,10 @@ env_files = ["${PREFIX_CONFIG}/risedev-env"]
# - rpk:
# Golang based.
# Style example: RPK_BROKERS=localhost:9092 rpk topic create t
[tasks.kafka-hook]
[tasks.check-kafka]
private = true
description = "Check if Kafka is started by RiseDev"
dependencies = ["source-test-hook"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env sh
set -e
Expand All @@ -38,10 +34,11 @@ fi
[tasks.clean-kafka]
category = "RiseDev - Test - Source Test - Kafka"
description = "Delete all kafka topics."
dependencies = ["kafka-hook"]
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env sh
set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
Expand All @@ -52,7 +49,7 @@ fi

[tasks.kafka-topics]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -61,7 +58,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTS

[tasks.kafka-produce]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -70,7 +67,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_K

[tasks.kafka-consume]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -79,7 +76,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_K

[tasks.kafka-consumer-groups]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = """
#!/usr/bin/env sh
set -e
Expand All @@ -89,7 +86,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KA
# rpk tools
[tasks.rpk]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
# check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options
script = """
#!/usr/bin/env sh
Expand All @@ -106,7 +103,7 @@ rpk "$@"
[tasks.redpanda-console]
category = "RiseDev - Test - Source Test - Kafka"
description = "Start Redpanda console (Kafka GUI) at localhost:8080."
dependencies = ["kafka-hook"]
dependencies = ["check-kafka"]
script = '''
#!/usr/bin/env sh
set -e
Expand Down
9 changes: 9 additions & 0 deletions e2e_test/source_inline/kafka/b.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
for i in {0..9}; do
cat <<EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
3 {"v1": 4, "v2": "d"}
EOF
done
13 changes: 5 additions & 8 deletions e2e_test/source_inline/kafka/consumer_group.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
// zx: A tool for writing better scripts
// https://google.github.io/zx/

const {
mv: mv,
_: _command,
} = minimist(process.argv.slice(3), {
string: ["mv", "topic"],
const { mv: mv, _: _command } = minimist(process.argv.slice(3), {
string: ["mv"],
_: ["list-members", "list-lags"],
});
const command = _command[0];
Expand Down Expand Up @@ -62,7 +59,7 @@ async function list_consumer_group_members(fragment_id) {
const groups = await list_consumer_groups(fragment_id);
return Promise.all(
groups.map(async (group_name) => {
return (await describe_consumer_group(group_name))["MEMBERS"]
return (await describe_consumer_group(group_name))["MEMBERS"];
})
);
}
Expand All @@ -71,14 +68,14 @@ async function list_consumer_group_lags(fragment_id) {
const groups = await list_consumer_groups(fragment_id);
return Promise.all(
groups.map(async (group_name) => {
return (await describe_consumer_group(group_name))["TOTAL-LAG"]
return (await describe_consumer_group(group_name))["TOTAL-LAG"];
})
);
}

const fragment_id = await get_fragment_id_of_mv(mv);
if (command == "list-groups") {
echo`${(await list_consumer_groups(fragment_id))}`;
echo`${await list_consumer_groups(fragment_id)}`;
} else if (command == "list-members") {
echo`${await list_consumer_group_members(fragment_id)}`;
} else if (command == "list-lags") {
Expand Down
Loading

0 comments on commit af9ac6d

Please sign in to comment.