Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed May 27, 2024
2 parents 3d5917c + 9c6f01b commit a46061b
Show file tree
Hide file tree
Showing 61 changed files with 1,254 additions and 779 deletions.
28 changes: 19 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 0 additions & 24 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ extend = [
{ path = "src/risedevtool/minio.toml" },
{ path = "src/risedevtool/etcd.toml" },
{ path = "src/risedevtool/tempo.toml" },
{ path = "src/risedevtool/kafka.toml" },
{ path = "src/risedevtool/gcloud-pubsub.toml" },
{ path = "src/risedevtool/redis.toml" },
{ path = "src/risedevtool/connector.toml" },
Expand Down Expand Up @@ -498,7 +497,6 @@ dependencies = [
"download-etcd",
"download-grafana",
"download-tempo",
"download-kafka",
"download-mcli",
"download-minio",
"download-prometheus",
Expand Down Expand Up @@ -556,7 +554,6 @@ dependencies = [
"download-grafana",
"download-prometheus",
"download-tempo",
"download-kafka",
"download-redis",
]

Expand Down Expand Up @@ -707,27 +704,13 @@ script = '''
set -euo pipefail
wait_kafka_exit() {
# Follow kafka-server-stop.sh
while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do
echo "Waiting for kafka to exit"
sleep 1
done
}
kill_kafka() {
${PREFIX_BIN}/kafka/bin/kafka-server-stop.sh
wait_kafka_exit
}
if ! ${TMUX} ls &>/dev/null ; then
echo "No risedev cluster to kill. Exiting..."
exit 0
fi
# Kill other components with Ctrl+C/Ctrl+D
${TMUX} list-windows -F "#{window_name} #{pane_id}" \
| grep --invert-match --extended-regexp '(kafka)' \
| awk '{ print $2 }' \
| xargs -I {} ${TMUX} send-keys -t {} C-c C-d
Expand All @@ -738,13 +721,6 @@ if [[ -n ${containers} ]]; then
docker stop ${containers}
fi
# Kill kafka cleanly. Ctrl+C will lose data.
if [[ -n $(${TMUX} list-windows | grep kafka) ]];
then
echo "kill kafka"
kill_kafka || true
fi
${TMUX} kill-server
test $? -eq 0 || { echo "Failed to stop all RiseDev components."; exit 1; }
'''
Expand Down
28 changes: 2 additions & 26 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,44 +238,20 @@ services:
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
- message_queue
ports:
- "8082:8082"
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093,PLAINTEXT_INTERNAL://localhost:29093
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
SCHEMA_REGISTRY_DEBUG: 'true'

kafka:
container_name: kafka
image: confluentinc/cp-kafka:latest
ports:
- "29093:29093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_INTERNAL://localhost:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

pulsar-server:
container_name: pulsar-server
image: apachepulsar/pulsar:latest
Expand Down
1 change: 0 additions & 1 deletion ci/risedev-components.ci.benchmark.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
ENABLE_BUILD_RUST=true
ENABLE_RELEASE_PROFILE=true
ENABLE_PROMETHEUS_GRAFANA=true
1 change: 0 additions & 1 deletion ci/risedev-components.ci.source.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
ENABLE_PUBSUB=true
78 changes: 40 additions & 38 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,48 @@
# Exits as soon as any line fails.
set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1
export RPK_BROKERS="message_queue:29092"

rpk topic create test-rw-sink-append-only
rpk topic create test-rw-sink-upsert
rpk topic create test-rw-sink-upsert-schema
rpk topic create test-rw-sink-debezium
rpk topic create test-rw-sink-without-snapshot
rpk topic create test-rw-sink-text-key-id

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2

# test append-only kafka sink
echo "testing append-only kafka sink"
diff ./e2e_test/sink/kafka/append_only1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/append_only1.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
fi

# test upsert kafka sink
echo "testing upsert kafka sink"
diff ./e2e_test/sink/kafka/upsert1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert1.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink is not as expected."
exit 1
fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema1.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink is not as expected."
Expand All @@ -58,34 +60,34 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '',

# test append-only kafka sink after update
echo "testing append-only kafka sink after updating data"
diff ./e2e_test/sink/kafka/append_only2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/append_only2.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink after update is not as expected."
exit 1
fi

# test upsert kafka sink after update
echo "testing upsert kafka sink after updating data"
diff ./e2e_test/sink/kafka/upsert2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert2.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema2.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after update is not as expected."
Expand All @@ -97,8 +99,8 @@ fi

# test without-snapshot kafka sink
echo "testing without-snapshot kafka sink"
diff ./e2e_test/sink/kafka/without_snapshot.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --from-beginning --max-messages 3 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/without_snapshot.result \
<((rpk topic consume test-rw-sink-without-snapshot --offset start --format '%v\n' --num 3 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
Expand All @@ -110,25 +112,25 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;"

# test upsert kafka sink after delete
echo "testing upsert kafka sink after deleting data"
diff ./e2e_test/sink/kafka/upsert3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert3.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema3.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after delete is not as expected."
Expand All @@ -139,29 +141,29 @@ else
fi

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-append-only
rpk topic delete test-rw-sink-upsert
rpk topic delete test-rw-sink-debezium

# test different encoding
echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-upsert-avro
2 changes: 1 addition & 1 deletion ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dnf install -y lld
ld.lld --version

echo "--- Install dependencies"
dnf install -y perl-core wget python3 python3-devel cyrus-sasl-devel rsync
dnf install -y perl-core wget python3 python3-devel cyrus-sasl-devel rsync openssl-devel

echo "--- Install java and maven"
dnf install -y java-11-openjdk java-11-openjdk-devel
Expand Down
Loading

0 comments on commit a46061b

Please sign in to comment.