Skip to content

Commit

Permalink
refactor(test): use ci-provisioned message-queue service in backwards…
Browse files Browse the repository at this point in the history
…-compat tests (#16584) (#16786)

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
github-actions[bot] and BugenZhao authored May 16, 2024
1 parent b7a1408 commit b21e9f5
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 107 deletions.
7 changes: 4 additions & 3 deletions backwards-compat-tests/scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ full-without-monitoring:
- use: compute-node
- use: frontend
- use: compactor
- use: zookeeper
- use: kafka
user-managed: true
address: message_queue
port: 29092
EOF

cat <<EOF > risedev-components.user.env
RISEDEV_CONFIGURED=false
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
# Fetch risingwave binary from release.
ENABLE_BUILD_RUST=true
Expand Down Expand Up @@ -65,4 +66,4 @@ main() {
validate_new_cluster "$NEW_VERSION"
}

main
main
53 changes: 6 additions & 47 deletions backwards-compat-tests/scripts/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,9 @@ RECOVERY_DURATION=20

# Setup test directory
TEST_DIR=.risingwave/backwards-compat-tests/
KAFKA_PATH=.risingwave/bin/kafka
mkdir -p $TEST_DIR
cp -r backwards-compat-tests/slt/* $TEST_DIR

wait_kafka_exit() {
# Follow kafka-server-stop.sh
while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do
echo "Waiting for kafka to exit"
sleep 1
done
}

wait_zookeeper_exit() {
# Follow zookeeper-server-stop.sh
while [[ -n "$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')" ]]; do
echo "Waiting for zookeeper to exit"
sleep 1
done
}

kill_kafka() {
$KAFKA_PATH/bin/kafka-server-stop.sh
wait_kafka_exit
}

kill_zookeeper() {
$KAFKA_PATH/bin/zookeeper-server-stop.sh
wait_zookeeper_exit
}

wait_for_process() {
process_name="$1"

Expand Down Expand Up @@ -77,21 +50,9 @@ kill_cluster() {

# Kill other components
$TMUX list-windows -t risedev -F "#{window_name} #{pane_id}" |
grep -v 'kafka' |
grep -v 'zookeeper' |
awk '{ print $2 }' |
xargs -I {} $TMUX send-keys -t {} C-c C-d

set +e
if [[ -n $($TMUX list-windows -t risedev | grep kafka) ]]; then
echo "kill kafka"
kill_kafka

echo "kill zookeeper"
kill_zookeeper
fi
set -e

$TMUX kill-server
test $? -eq 0 || {
echo "Failed to stop all RiseDev components."
Expand All @@ -117,18 +78,16 @@ check_version() {
}

create_kafka_topic() {
"$KAFKA_PATH"/bin/kafka-topics.sh \
--create \
--topic backwards_compat_test_kafka_source --bootstrap-server localhost:29092
RPK_BROKERS=message_queue:29092 \
rpk topic create backwards_compat_test_kafka_source
}

insert_json_kafka() {
local JSON=$1
echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \
--topic backwards_compat_test_kafka_source \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=,"

echo "$JSON" | \
RPK_BROKERS=message_queue:29092 \
rpk topic produce backwards_compat_test_kafka_source -f "%k,%v"
}

seed_json_kafka() {
Expand Down
2 changes: 1 addition & 1 deletion backwards-compat-tests/slt/kafka/invalid_options/seed.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ CREATE SOURCE IF NOT EXISTS kafka_source_with_invalid_option
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
properties.bootstrap.server='message_queue:29092',
scan.startup.mode='earliest',
invalid_option='oops'
) FORMAT PLAIN ENCODE JSON;
Expand Down
2 changes: 1 addition & 1 deletion backwards-compat-tests/slt/kafka/seed.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CREATE SOURCE IF NOT EXISTS kafka_source
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
properties.bootstrap.server='message_queue:29092',
scan.startup.mode='earliest',
) FORMAT PLAIN ENCODE JSON;

Expand Down
4 changes: 2 additions & 2 deletions backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ CREATE TABLE IF NOT EXISTS kafka_table
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
properties.bootstrap.server='message_queue:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
) FORMAT UPSERT ENCODE JSON;
4 changes: 2 additions & 2 deletions backwards-compat-tests/slt/kafka/upsert/include_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ INCLUDE key as _rw_key
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
properties.bootstrap.server='message_queue:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
) FORMAT UPSERT ENCODE JSON;
46 changes: 7 additions & 39 deletions ci/scripts/backwards-compat-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ source backwards-compat-tests/scripts/utils.sh

configure_rw() {
VERSION="$1"
ENABLE_BUILD="$2"

echo "--- Setting up cluster config"
cat <<EOF > risedev-profiles.user.yml
Expand All @@ -49,19 +50,16 @@ full-without-monitoring:
- use: compute-node
- use: frontend
- use: compactor
- use: zookeeper
- use: kafka
EOF

cat <<EOF > risedev-components.user.env
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
# Fetch risingwave binary from release.
ENABLE_BUILD_RUST=false
# Whether to build or directly fetch binary from release.
ENABLE_BUILD_RUST=$ENABLE_BUILD
# Use target/debug for simplicity.
ENABLE_RELEASE_PROFILE=false
Expand All @@ -73,36 +71,6 @@ if version_le "${VERSION:-}" "1.8.0" ; then
fi
}

configure_rw_build() {
echo "--- Setting up cluster config"
cat <<EOF > risedev-profiles.user.yml
full-without-monitoring:
steps:
- use: minio
- use: etcd
- use: meta-node
- use: compute-node
- use: frontend
- use: compactor
- use: zookeeper
- use: kafka
EOF

cat <<EOF > risedev-components.user.env
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
# Make sure that it builds
ENABLE_BUILD_RUST=true
# Use target/debug for simplicity.
ENABLE_RELEASE_PROFILE=false
EOF
}

setup_old_cluster() {
echo "--- Build risedev for $OLD_VERSION, it may not be backwards compatible"
git config --global --add safe.directory /risingwave
Expand All @@ -115,15 +83,15 @@ setup_old_cluster() {
if [[ "$?" -ne 0 ]]; then
set -e
echo "Failed to download ${OLD_VERSION} from github releases, build from source later during \`risedev d\`"
configure_rw_build
configure_rw "$OLD_VERSION" true
else
set -e
tar -xvf risingwave-v"${OLD_VERSION}"-x86_64-unknown-linux.tar.gz
mv risingwave target/debug/risingwave

echo "--- Start cluster on tag $OLD_VERSION"
git config --global --add safe.directory /risingwave
configure_rw "$OLD_VERSION"
configure_rw "$OLD_VERSION" false
fi
}

Expand All @@ -148,8 +116,8 @@ main() {
# Assume we use the latest version, so we just set to some large number.
# The current $NEW_VERSION as of this change is 1.7.0, so we can't use that.
# See: https://github.com/risingwavelabs/risingwave/pull/15448
configure_rw "99.99.99"
configure_rw "99.99.99" false
validate_new_cluster "$NEW_VERSION"
}

main
main
1 change: 1 addition & 0 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# 1002 | CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t | 56.12% | 2023-09-27 06:37:06.636+00:00
#(1 row)

# TODO: refactor with inline style.

set -euo pipefail

Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ steps:
- "build"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ steps:
- "build"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
Expand Down
22 changes: 12 additions & 10 deletions src/risedevtool/src/task/kafka_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ impl KafkaService {
impl Task for KafkaService {
fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
ctx.service(self);

if self.config.user_managed {
ctx.pb.set_message("user managed");
writeln!(
&mut ctx.log,
"Please start your Kafka at {}:{}\n\n",
self.config.address, self.config.port
)?;
return Ok(());
}

ctx.pb.set_message("starting...");

let path = self.kafka_path()?;
Expand Down Expand Up @@ -74,16 +85,7 @@ impl Task for KafkaService {

cmd.arg(config_path);

if !self.config.user_managed {
ctx.run_command(ctx.tmux_run(cmd)?)?;
} else {
ctx.pb.set_message("user managed");
writeln!(
&mut ctx.log,
"Please start your Kafka at {}:{}\n\n",
self.config.listen_address, self.config.port
)?;
}
ctx.run_command(ctx.tmux_run(cmd)?)?;

ctx.pb.set_message("started");

Expand Down

0 comments on commit b21e9f5

Please sign in to comment.