From c47799f150d996f771dbcac7bb2f6b28d632be67 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Tue, 24 Oct 2023 00:04:04 +0800 Subject: [PATCH] ci(sink): as source, start kafka with docker-compose rather than risedev --- ci/docker-compose.yml | 1 + ci/scripts/e2e-kafka-sink-test.sh | 40 ++++++++++++++--------------- ci/scripts/e2e-sink-test.sh | 2 +- e2e_test/sink/kafka/create_sink.slt | 22 ++++++++-------- e2e_test/sink/kafka/protobuf.slt | 10 ++++---- risedev.yml | 34 ------------------------ 6 files changed, 38 insertions(+), 71 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 6fe7cfbfdeca2..1d44ea60ce216 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -85,6 +85,7 @@ services: depends_on: - mysql - db + - message_queue - elasticsearch - clickhouse-server - pulsar diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 06ef185f46e8b..71a91f2d8fba9 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -3,10 +3,10 @@ # Exits as soon as any line fails. set -euo pipefail -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 @@ -14,7 +14,7 @@ sleep 2 # test append-only kafka sink echo "testing append-only kafka sink" diff ./e2e_test/sink/kafka/append_only1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink is not as expected." exit 1 @@ -23,7 +23,7 @@ fi # test upsert kafka sink echo "testing upsert kafka sink" diff ./e2e_test/sink/kafka/upsert1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink is not as expected." exit 1 @@ -32,7 +32,7 @@ fi # test upsert kafka sink with schema echo "testing upsert kafka sink with schema" diff ./e2e_test/sink/kafka/upsert_schema1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -40,7 +40,7 @@ fi # test debezium kafka sink echo "testing debezium kafka sink" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null +(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink is not as expected." @@ -57,7 +57,7 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '', # test append-only kafka sink after update echo "testing append-only kafka sink after updating data" diff ./e2e_test/sink/kafka/append_only2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink after update is not as expected." exit 1 @@ -66,7 +66,7 @@ fi # test upsert kafka sink after update echo "testing upsert kafka sink after updating data" diff ./e2e_test/sink/kafka/upsert2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." exit 1 @@ -75,7 +75,7 @@ fi # test upsert kafka sink with schema after update echo "testing upsert kafka sink with schema after updating data" diff ./e2e_test/sink/kafka/upsert_schema2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -83,7 +83,7 @@ fi # test debezium kafka sink after update echo "testing debezium kafka sink after updating data" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null +(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink after update is not as expected." @@ -100,7 +100,7 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;" # test upsert kafka sink after delete echo "testing upsert kafka sink after deleting data" diff ./e2e_test/sink/kafka/upsert3.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." exit 1 @@ -109,7 +109,7 @@ fi # test upsert kafka sink with schema after delete echo "testing upsert kafka sink with schema after deleting data" diff ./e2e_test/sink/kafka/upsert_schema3.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -117,7 +117,7 @@ fi # test debezium kafka sink after delete echo "testing debezium kafka sink after deleting data" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null +(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink after delete is not as expected." @@ -128,13 +128,13 @@ else fi sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 # test different encoding echo "testing protobuf" cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 2dc02f0eada7a..ce2cc46381eba 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -57,7 +57,7 @@ node_port=50051 node_timeout=10 echo "--- starting risingwave cluster with connector node" -cargo make ci-start ci-kafka +cargo make ci-start ci-1cn-1fe ./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 & echo "waiting for connector node to start" diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 25e3a59fdff3a..a1f296774f526 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -31,7 +31,7 @@ create connection mock with ( statement error create sink si_kafka_append_only_conn from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', force_append_only = 'true', @@ -42,7 +42,7 @@ create sink si_kafka_append_only_conn from t_kafka with ( statement ok create sink si_kafka_append_only_conn from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', force_append_only = 'true', @@ -66,7 +66,7 @@ drop connection mock; statement error sink cannot be append-only create sink si_kafka_append_only from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', ); @@ -74,7 +74,7 @@ create sink si_kafka_append_only from t_kafka with ( statement ok create sink si_kafka_append_only from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', force_append_only = 'true' @@ -83,7 +83,7 @@ create sink si_kafka_append_only from t_kafka with ( statement error primary key not defined create sink si_kafka_upsert from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-upsert', type = 'upsert', ); @@ -91,7 +91,7 @@ create sink si_kafka_upsert from t_kafka with ( statement ok create sink si_kafka_upsert from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-upsert', type = 'upsert', primary_key = 'id', @@ -100,7 +100,7 @@ create sink si_kafka_upsert from t_kafka with ( statement ok create sink si_kafka_upsert_schema from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-upsert-schema', primary_key = 'id', ) format upsert encode json ( @@ -110,7 +110,7 @@ create sink si_kafka_upsert_schema from t_kafka with ( statement ok create sink si_kafka_debezium from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id', @@ -119,7 +119,7 @@ create sink si_kafka_debezium from t_kafka with ( statement error primary key not defined create sink debezium_without_pk from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', ); @@ -127,7 +127,7 @@ create sink debezium_without_pk from t_kafka with ( statement ok create sink multiple_pk from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,v_varchar' @@ -139,7 +139,7 @@ drop sink multiple_pk; statement error Sink primary key column not found: invalid. create sink invalid_pk_column from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,invalid' diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index f69c4a9d07110..87ab884eddbde 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -2,7 +2,7 @@ statement ok create table from_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( schema.location = 'file:///risingwave/proto-recursive', message = 'recursive.AllTypes'); @@ -37,7 +37,7 @@ statement ok create sink sink0 from into_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 'file:///risingwave/proto-recursive', @@ -70,7 +70,7 @@ statement error failed to read file create sink sink_err from into_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 'file:///risingwave/proto-recursiv', @@ -80,7 +80,7 @@ statement error encode extra_column error: field not in proto create sink sink_err as select 1 as extra_column with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 'file:///risingwave/proto-recursive', @@ -90,7 +90,7 @@ statement error s3 URL not supported yet create sink sink_err from into_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 's3:///risingwave/proto-recursive', diff --git a/risedev.yml b/risedev.yml index a5ba8a7b43f97..135a33f602a6a 100644 --- a/risedev.yml +++ b/risedev.yml @@ -685,40 +685,6 @@ profile: - use: pubsub persist-data: true - ci-kafka: - config-path: src/config/ci.toml - steps: - - use: minio - - use: etcd - unsafe-no-fsync: true - - use: meta-node - - use: compute-node - enable-tiered-cache: true - - use: frontend - - use: compactor - - use: zookeeper - persist-data: true - - use: kafka - persist-data: true - - ci-kafka-plus-pubsub: - config-path: src/config/ci.toml - steps: - - use: minio - - use: etcd - unsafe-no-fsync: true - - use: meta-node - - use: compute-node - enable-tiered-cache: true - - use: frontend - - use: compactor - - use: zookeeper - persist-data: true - - use: kafka - persist-data: true - - use: pubsub - persist-data: true - ci-redis: config-path: src/config/ci.toml steps: