diff --git a/backwards-compat-tests/scripts/run_local.sh b/backwards-compat-tests/scripts/run_local.sh index 76b1a4f333458..20e820ef3e092 100755 --- a/backwards-compat-tests/scripts/run_local.sh +++ b/backwards-compat-tests/scripts/run_local.sh @@ -23,8 +23,10 @@ full-without-monitoring: - use: compute-node - use: frontend - use: compactor - - use: zookeeper - use: kafka + user-managed: true + address: message_queue + port: 29092 EOF cat < risedev-components.user.env @@ -32,7 +34,6 @@ RISEDEV_CONFIGURED=false ENABLE_MINIO=true ENABLE_ETCD=true -ENABLE_KAFKA=true # Fetch risingwave binary from release. ENABLE_BUILD_RUST=true @@ -65,4 +66,4 @@ main() { validate_new_cluster "$NEW_VERSION" } -main \ No newline at end of file +main diff --git a/backwards-compat-tests/scripts/utils.sh b/backwards-compat-tests/scripts/utils.sh index ad4fd7faae9df..6b5184b2bb0ea 100644 --- a/backwards-compat-tests/scripts/utils.sh +++ b/backwards-compat-tests/scripts/utils.sh @@ -19,36 +19,9 @@ RECOVERY_DURATION=20 # Setup test directory TEST_DIR=.risingwave/backwards-compat-tests/ -KAFKA_PATH=.risingwave/bin/kafka mkdir -p $TEST_DIR cp -r backwards-compat-tests/slt/* $TEST_DIR -wait_kafka_exit() { - # Follow kafka-server-stop.sh - while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do - echo "Waiting for kafka to exit" - sleep 1 - done -} - -wait_zookeeper_exit() { - # Follow zookeeper-server-stop.sh - while [[ -n "$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')" ]]; do - echo "Waiting for zookeeper to exit" - sleep 1 - done -} - -kill_kafka() { - $KAFKA_PATH/bin/kafka-server-stop.sh - wait_kafka_exit -} - -kill_zookeeper() { - $KAFKA_PATH/bin/zookeeper-server-stop.sh - wait_zookeeper_exit -} - wait_for_process() { process_name="$1" @@ -77,21 +50,9 @@ kill_cluster() { # Kill other components $TMUX list-windows -t risedev -F "#{window_name} #{pane_id}" | - grep -v 'kafka' | - grep -v 'zookeeper' | awk '{ print $2 }' | xargs -I {} $TMUX send-keys -t {} C-c C-d - set +e - if [[ -n $($TMUX list-windows -t risedev | grep kafka) ]]; then - echo "kill kafka" - kill_kafka - - echo "kill zookeeper" - kill_zookeeper - fi - set -e - $TMUX kill-server test $? -eq 0 || { echo "Failed to stop all RiseDev components." @@ -117,18 +78,16 @@ check_version() { } create_kafka_topic() { - "$KAFKA_PATH"/bin/kafka-topics.sh \ - --create \ - --topic backwards_compat_test_kafka_source --bootstrap-server localhost:29092 + RPK_BROKERS=message_queue:29092 \ + rpk topic create backwards_compat_test_kafka_source } insert_json_kafka() { local JSON=$1 - echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \ - --topic backwards_compat_test_kafka_source \ - --bootstrap-server localhost:29092 \ - --property "parse.key=true" \ - --property "key.separator=," + + echo "$JSON" | \ + RPK_BROKERS=message_queue:29092 \ + rpk topic produce backwards_compat_test_kafka_source -f "%k,%v" } seed_json_kafka() { diff --git a/backwards-compat-tests/slt/kafka/invalid_options/seed.slt b/backwards-compat-tests/slt/kafka/invalid_options/seed.slt index 6021cf1253b8e..2fef943c8ab49 100644 --- a/backwards-compat-tests/slt/kafka/invalid_options/seed.slt +++ b/backwards-compat-tests/slt/kafka/invalid_options/seed.slt @@ -14,7 +14,7 @@ CREATE SOURCE IF NOT EXISTS kafka_source_with_invalid_option WITH ( connector='kafka', topic='backwards_compat_test_kafka_source', - properties.bootstrap.server='localhost:29092', + properties.bootstrap.server='message_queue:29092', scan.startup.mode='earliest', invalid_option='oops' ) FORMAT PLAIN ENCODE JSON; diff --git a/backwards-compat-tests/slt/kafka/seed.slt b/backwards-compat-tests/slt/kafka/seed.slt index 3840ce0c96b15..aa2a196694e4d 100644 --- a/backwards-compat-tests/slt/kafka/seed.slt +++ b/backwards-compat-tests/slt/kafka/seed.slt @@ -11,7 +11,7 @@ CREATE SOURCE IF NOT EXISTS kafka_source WITH ( connector='kafka', topic='backwards_compat_test_kafka_source', - properties.bootstrap.server='localhost:29092', + properties.bootstrap.server='message_queue:29092', scan.startup.mode='earliest', ) FORMAT PLAIN ENCODE JSON; diff --git a/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt index 55cfce886455d..57f1e2d27391b 100644 --- a/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt +++ b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt @@ -11,6 +11,6 @@ CREATE TABLE IF NOT EXISTS kafka_table WITH ( connector='kafka', topic='backwards_compat_test_kafka_source', - properties.bootstrap.server='localhost:29092', + properties.bootstrap.server='message_queue:29092', scan.startup.mode='earliest', -) FORMAT UPSERT ENCODE JSON; \ No newline at end of file +) FORMAT UPSERT ENCODE JSON; diff --git a/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt index 36ef426574223..37d41e195ee05 100644 --- a/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt +++ b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt @@ -13,6 +13,6 @@ INCLUDE key as _rw_key WITH ( connector='kafka', topic='backwards_compat_test_kafka_source', - properties.bootstrap.server='localhost:29092', + properties.bootstrap.server='message_queue:29092', scan.startup.mode='earliest', -) FORMAT UPSERT ENCODE JSON; \ No newline at end of file +) FORMAT UPSERT ENCODE JSON; diff --git a/ci/scripts/backwards-compat-test.sh b/ci/scripts/backwards-compat-test.sh index d539748a23fd9..3ffb3fb8284a8 100755 --- a/ci/scripts/backwards-compat-test.sh +++ b/ci/scripts/backwards-compat-test.sh @@ -38,6 +38,7 @@ source backwards-compat-tests/scripts/utils.sh configure_rw() { VERSION="$1" +ENABLE_BUILD="$2" echo "--- Setting up cluster config" cat < risedev-profiles.user.yml @@ -49,8 +50,6 @@ full-without-monitoring: - use: compute-node - use: frontend - use: compactor - - use: zookeeper - - use: kafka EOF cat < risedev-components.user.env @@ -58,10 +57,9 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true -ENABLE_KAFKA=true -# Fetch risingwave binary from release. -ENABLE_BUILD_RUST=false +# Whether to build or directly fetch binary from release. +ENABLE_BUILD_RUST=$ENABLE_BUILD # Use target/debug for simplicity. ENABLE_RELEASE_PROFILE=false @@ -73,36 +71,6 @@ if version_le "${VERSION:-}" "1.8.0" ; then fi } -configure_rw_build() { -echo "--- Setting up cluster config" -cat < risedev-profiles.user.yml -full-without-monitoring: - steps: - - use: minio - - use: etcd - - use: meta-node - - use: compute-node - - use: frontend - - use: compactor - - use: zookeeper - - use: kafka -EOF - -cat < risedev-components.user.env -RISEDEV_CONFIGURED=true - -ENABLE_MINIO=true -ENABLE_ETCD=true -ENABLE_KAFKA=true - -# Make sure that it builds -ENABLE_BUILD_RUST=true - -# Use target/debug for simplicity. -ENABLE_RELEASE_PROFILE=false -EOF -} - setup_old_cluster() { echo "--- Build risedev for $OLD_VERSION, it may not be backwards compatible" git config --global --add safe.directory /risingwave @@ -115,7 +83,7 @@ setup_old_cluster() { if [[ "$?" -ne 0 ]]; then set -e echo "Failed to download ${OLD_VERSION} from github releases, build from source later during \`risedev d\`" - configure_rw_build + configure_rw "$OLD_VERSION" true else set -e tar -xvf risingwave-v"${OLD_VERSION}"-x86_64-unknown-linux.tar.gz @@ -123,7 +91,7 @@ setup_old_cluster() { echo "--- Start cluster on tag $OLD_VERSION" git config --global --add safe.directory /risingwave - configure_rw "$OLD_VERSION" + configure_rw "$OLD_VERSION" false fi } @@ -148,8 +116,8 @@ main() { # Assume we use the latest version, so we just set to some large number. # The current $NEW_VERSION as of this change is 1.7.0, so we can't use that. # See: https://github.com/risingwavelabs/risingwave/pull/15448 - configure_rw "99.99.99" + configure_rw "99.99.99" false validate_new_cluster "$NEW_VERSION" } -main \ No newline at end of file +main diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index f01e2c5b3d771..b641065853337 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -13,6 +13,7 @@ # 1002 | CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t | 56.12% | 2023-09-27 06:37:06.636+00:00 #(1 row) +# TODO: refactor with inline style. set -euo pipefail diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index ba3a047058256..30881275cf271 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -654,7 +654,7 @@ steps: - "build" plugins: - docker-compose#v5.1.0: - run: rw-build-env + run: source-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 10e57f6fee825..475b981a18b07 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -642,7 +642,7 @@ steps: - "build" plugins: - docker-compose#v5.1.0: - run: rw-build-env + run: source-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/src/risedevtool/src/task/kafka_service.rs b/src/risedevtool/src/task/kafka_service.rs index df0eb4a0fa313..ec3a54599c509 100644 --- a/src/risedevtool/src/task/kafka_service.rs +++ b/src/risedevtool/src/task/kafka_service.rs @@ -46,6 +46,17 @@ impl KafkaService { impl Task for KafkaService { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { ctx.service(self); + + if self.config.user_managed { + ctx.pb.set_message("user managed"); + writeln!( + &mut ctx.log, + "Please start your Kafka at {}:{}\n\n", + self.config.address, self.config.port + )?; + return Ok(()); + } + ctx.pb.set_message("starting..."); let path = self.kafka_path()?; @@ -74,16 +85,7 @@ impl Task for KafkaService { cmd.arg(config_path); - if !self.config.user_managed { - ctx.run_command(ctx.tmux_run(cmd)?)?; - } else { - ctx.pb.set_message("user managed"); - writeln!( - &mut ctx.log, - "Please start your Kafka at {}:{}\n\n", - self.config.listen_address, self.config.port - )?; - } + ctx.run_command(ctx.tmux_run(cmd)?)?; ctx.pb.set_message("started");