Skip to content

Commit

Permalink
ci(sink): as source, start kafka with docker-compose rather than risedev
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Oct 23, 2023
1 parent 8f981f3 commit c47799f
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 71 deletions.
1 change: 1 addition & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ services:
depends_on:
- mysql
- db
- message_queue
- elasticsearch
- clickhouse-server
- pulsar
Expand Down
40 changes: 20 additions & 20 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
# Exits as soon as any line fails.
set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2

# test append-only kafka sink
echo "testing append-only kafka sink"
diff ./e2e_test/sink/kafka/append_only1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
Expand All @@ -23,7 +23,7 @@ fi
# test upsert kafka sink
echo "testing upsert kafka sink"
diff ./e2e_test/sink/kafka/upsert1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink is not as expected."
exit 1
Expand All @@ -32,15 +32,15 @@ fi
# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink is not as expected."
Expand All @@ -57,7 +57,7 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '',
# test append-only kafka sink after update
echo "testing append-only kafka sink after updating data"
diff ./e2e_test/sink/kafka/append_only2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink after update is not as expected."
exit 1
Expand All @@ -66,7 +66,7 @@ fi
# test upsert kafka sink after update
echo "testing upsert kafka sink after updating data"
diff ./e2e_test/sink/kafka/upsert2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
Expand All @@ -75,15 +75,15 @@ fi
# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after update is not as expected."
Expand All @@ -100,7 +100,7 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;"
# test upsert kafka sink after delete
echo "testing upsert kafka sink after deleting data"
diff ./e2e_test/sink/kafka/upsert3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
Expand All @@ -109,15 +109,15 @@ fi
# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after delete is not as expected."
Expand All @@ -128,13 +128,13 @@ else
fi

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1

# test different encoding
echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
2 changes: 1 addition & 1 deletion ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ node_port=50051
node_timeout=10

echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-kafka
cargo make ci-start ci-1cn-1fe
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &

echo "waiting for connector node to start"
Expand Down
22 changes: 11 additions & 11 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ create connection mock with (
statement error
create sink si_kafka_append_only_conn from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true',
Expand All @@ -42,7 +42,7 @@ create sink si_kafka_append_only_conn from t_kafka with (
statement ok
create sink si_kafka_append_only_conn from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true',
Expand All @@ -66,15 +66,15 @@ drop connection mock;
statement error sink cannot be append-only
create sink si_kafka_append_only from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
);

statement ok
create sink si_kafka_append_only from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true'
Expand All @@ -83,15 +83,15 @@ create sink si_kafka_append_only from t_kafka with (
statement error primary key not defined
create sink si_kafka_upsert from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-upsert',
type = 'upsert',
);

statement ok
create sink si_kafka_upsert from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-upsert',
type = 'upsert',
primary_key = 'id',
Expand All @@ -100,7 +100,7 @@ create sink si_kafka_upsert from t_kafka with (
statement ok
create sink si_kafka_upsert_schema from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-upsert-schema',
primary_key = 'id',
) format upsert encode json (
Expand All @@ -110,7 +110,7 @@ create sink si_kafka_upsert_schema from t_kafka with (
statement ok
create sink si_kafka_debezium from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id',
Expand All @@ -119,15 +119,15 @@ create sink si_kafka_debezium from t_kafka with (
statement error primary key not defined
create sink debezium_without_pk from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-debezium',
type = 'debezium',
);

statement ok
create sink multiple_pk from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar'
Expand All @@ -139,7 +139,7 @@ drop sink multiple_pk;
statement error Sink primary key column not found: invalid.
create sink invalid_pk_column from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,invalid'
Expand Down
10 changes: 5 additions & 5 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
create table from_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
Expand Down Expand Up @@ -37,7 +37,7 @@ statement ok
create sink sink0 from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
Expand Down Expand Up @@ -70,7 +70,7 @@ statement error failed to read file
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursiv',
Expand All @@ -80,7 +80,7 @@ statement error encode extra_column error: field not in proto
create sink sink_err as select 1 as extra_column with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
Expand All @@ -90,7 +90,7 @@ statement error s3 URL not supported yet
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 's3:///risingwave/proto-recursive',
Expand Down
34 changes: 0 additions & 34 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -685,40 +685,6 @@ profile:
- use: pubsub
persist-data: true

ci-kafka:
config-path: src/config/ci.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true

ci-kafka-plus-pubsub:
config-path: src/config/ci.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true
- use: pubsub
persist-data: true

ci-redis:
config-path: src/config/ci.toml
steps:
Expand Down

0 comments on commit c47799f

Please sign in to comment.