Skip to content

Commit

Permalink
feat(risedev): use docker for kafka service (#16536)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored May 24, 2024
1 parent 1aa8579 commit e5ad5d9
Show file tree
Hide file tree
Showing 19 changed files with 107 additions and 429 deletions.
24 changes: 0 additions & 24 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ extend = [
{ path = "src/risedevtool/minio.toml" },
{ path = "src/risedevtool/etcd.toml" },
{ path = "src/risedevtool/tempo.toml" },
{ path = "src/risedevtool/kafka.toml" },
{ path = "src/risedevtool/gcloud-pubsub.toml" },
{ path = "src/risedevtool/redis.toml" },
{ path = "src/risedevtool/connector.toml" },
Expand Down Expand Up @@ -498,7 +497,6 @@ dependencies = [
"download-etcd",
"download-grafana",
"download-tempo",
"download-kafka",
"download-mcli",
"download-minio",
"download-prometheus",
Expand Down Expand Up @@ -556,7 +554,6 @@ dependencies = [
"download-grafana",
"download-prometheus",
"download-tempo",
"download-kafka",
"download-redis",
]

Expand Down Expand Up @@ -707,27 +704,13 @@ script = '''
set -euo pipefail
wait_kafka_exit() {
# Follow kafka-server-stop.sh
while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do
echo "Waiting for kafka to exit"
sleep 1
done
}
kill_kafka() {
${PREFIX_BIN}/kafka/bin/kafka-server-stop.sh
wait_kafka_exit
}
if ! ${TMUX} ls &>/dev/null ; then
echo "No risedev cluster to kill. Exiting..."
exit 0
fi
# Kill other components with Ctrl+C/Ctrl+D
${TMUX} list-windows -F "#{window_name} #{pane_id}" \
| grep --invert-match --extended-regexp '(kafka)' \
| awk '{ print $2 }' \
| xargs -I {} ${TMUX} send-keys -t {} C-c C-d
Expand All @@ -738,13 +721,6 @@ if [[ -n ${containers} ]]; then
docker stop ${containers}
fi
# Kill kafka cleanly. Ctrl+C will lose data.
if [[ -n $(${TMUX} list-windows | grep kafka) ]];
then
echo "kill kafka"
kill_kafka || true
fi
${TMUX} kill-server
test $? -eq 0 || { echo "Failed to stop all RiseDev components."; exit 1; }
'''
Expand Down
1 change: 0 additions & 1 deletion ci/risedev-components.ci.benchmark.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
ENABLE_BUILD_RUST=true
ENABLE_RELEASE_PROFILE=true
ENABLE_PROMETHEUS_GRAFANA=true
1 change: 0 additions & 1 deletion ci/risedev-components.ci.source.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
ENABLE_PUBSUB=true
78 changes: 40 additions & 38 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,48 @@
# Exits as soon as any line fails.
set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1
export RPK_BROKERS="message_queue:29092"

rpk topic create test-rw-sink-append-only
rpk topic create test-rw-sink-upsert
rpk topic create test-rw-sink-upsert-schema
rpk topic create test-rw-sink-debezium
rpk topic create test-rw-sink-without-snapshot
rpk topic create test-rw-sink-text-key-id

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2

# test append-only kafka sink
echo "testing append-only kafka sink"
diff ./e2e_test/sink/kafka/append_only1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/append_only1.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
fi

# test upsert kafka sink
echo "testing upsert kafka sink"
diff ./e2e_test/sink/kafka/upsert1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert1.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink is not as expected."
exit 1
fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema1.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink is not as expected."
Expand All @@ -58,34 +60,34 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '',

# test append-only kafka sink after update
echo "testing append-only kafka sink after updating data"
diff ./e2e_test/sink/kafka/append_only2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/append_only2.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink after update is not as expected."
exit 1
fi

# test upsert kafka sink after update
echo "testing upsert kafka sink after updating data"
diff ./e2e_test/sink/kafka/upsert2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert2.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema2.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after update is not as expected."
Expand All @@ -97,8 +99,8 @@ fi

# test without-snapshot kafka sink
echo "testing without-snapshot kafka sink"
diff ./e2e_test/sink/kafka/without_snapshot.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --from-beginning --max-messages 3 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/without_snapshot.result \
<((rpk topic consume test-rw-sink-without-snapshot --offset start --format '%v\n' --num 3 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
Expand All @@ -110,25 +112,25 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;"

# test upsert kafka sink after delete
echo "testing upsert kafka sink after deleting data"
diff ./e2e_test/sink/kafka/upsert3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert3.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema3.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after delete is not as expected."
Expand All @@ -139,29 +141,29 @@ else
fi

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-append-only
rpk topic delete test-rw-sink-upsert
rpk topic delete test-rw-sink-debezium

# test different encoding
echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-upsert-avro
14 changes: 6 additions & 8 deletions e2e_test/sink/kafka/debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# debezium sink sends k/v pair
kv = line.split()
key = json.loads(kv[0])
# kafka consumer outputs string "null" for null payload
if kv[1] == "null":
value = kv[1]
# rpk output nothing for null payload
if len(kv) == 1:
value = None
else:
value = json.loads(kv[1])
# The `ts_ms` field may vary, so we delete it from the json object
Expand All @@ -26,12 +26,10 @@
with open(test_output_file) as file:
for line in file:
kv = line.split()
if len(kv) != 2:
print(line)
assert(len(kv) == 2)
key = json.loads(kv[0])
if kv[1] == "null":
value = kv[1]
# rpk output nothing for null payload
if len(kv) == 1:
value = None
else:
value = json.loads(kv[1])
# Assert `ts_ms` is an integer here.
Expand Down
Loading

0 comments on commit e5ad5d9

Please sign in to comment.