Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support schema registry in risedev #17001

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ services:
- "29092:29092"
- "9092:9092"
- "9644:9644"
- "8081:8081"
# Don't use Redpanda's schema registry, use the separated service instead
# - "8081:8081"
environment: {}
container_name: message_queue
healthcheck:
Expand Down Expand Up @@ -89,6 +90,7 @@ services:
- mysql
- db
- message_queue
- schemaregistry
- elasticsearch
- clickhouse-server
- redis-server
Expand Down
8 changes: 4 additions & 4 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,16 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
rpk topic delete test-rw-sink-upsert-avro
8 changes: 4 additions & 4 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
python3 -m pip install --break-system-packages requests protobuf confluent-kafka
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 20 user
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user
echo "make sure google/protobuf/source_context.proto is NOT in schema registry"
curl --silent 'http://message_queue:8081/subjects'; echo
# curl --silent --head -X GET 'http://message_queue:8081/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://message_queue:8081/subjects' | grep -v 'google/protobuf/source_context.proto'
curl --silent 'http://schemaregistry:8082/subjects'; echo
# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto'
risedev slt './e2e_test/schema_registry/pb.slt'
risedev slt './e2e_test/schema_registry/alter_sr.slt'

Expand Down
8 changes: 4 additions & 4 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE SOURCE src_user WITH (
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.User'
);

Expand All @@ -24,7 +24,7 @@ CREATE TABLE t_user WITH (
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.User'
);

Expand All @@ -36,7 +36,7 @@ SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields

sleep 5s

Expand All @@ -58,7 +58,7 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields

sleep 5s

Expand Down
4 changes: 2 additions & 2 deletions e2e_test/schema_registry/pb.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ create table sr_pb_test with (
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.User'
);

Expand All @@ -21,7 +21,7 @@ create table sr_pb_test_bk with (
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081,http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082',
message = 'test.User'
);

Expand Down
12 changes: 6 additions & 6 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ with (
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

statement ok
create table into_kafka (
Expand Down Expand Up @@ -40,7 +40,7 @@ create sink sink0 from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

sleep 2s

Expand Down Expand Up @@ -72,7 +72,7 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

statement error field not in avro
create sink sink_err as select 1 as extra_column, * from into_kafka with (
Expand All @@ -81,7 +81,7 @@ create sink sink_err as select 1 as extra_column, * from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

statement error unrecognized
create sink sink_err from into_kafka with (
Expand All @@ -90,7 +90,7 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'typo');

statement error empty field key.message
Expand All @@ -100,7 +100,7 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'record_name_strategy');

statement ok
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ create table from_kafka_csr_trivial with (
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

statement ok
Expand All @@ -22,7 +22,7 @@ create table from_kafka_csr_nested with (
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageH.MessageI');

statement ok
Expand Down Expand Up @@ -68,7 +68,7 @@ create sink sink_csr_trivial as select string_field as field_a from into_kafka w
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

statement ok
Expand All @@ -78,7 +78,7 @@ create sink sink_csr_nested as select sint32_field as field_i from into_kafka wi
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageH.MessageI');

sleep 2s
Expand Down
31 changes: 17 additions & 14 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
control substitution on
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only try RISEDEV_SCHEMA_REGISTRY_URL in this test. Not bother to modify others.


# FIXME: does this really work??
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

# Start with nosim to avoid running in deterministic test


Expand All @@ -7,18 +10,18 @@ CREATE TABLE upsert_avro_json_default_key ( primary key (rw_key) )
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');
FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

statement ok
CREATE TABLE upsert_student_avro_json ( primary key (rw_key) )
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'upsert_student_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');
FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


# TODO: Uncomment this when we add test data kafka key with format `"ID":id`
Expand All @@ -28,45 +31,45 @@ FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');
# )
# WITH (
# connector = 'kafka',
# properties.bootstrap.server = 'message_queue:29092',
# properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
# topic = 'upsert_avro_json')
# FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');
# FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


statement ok
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'debezium_non_compact_avro_json',
kafka.brokers = 'message_queue:29092',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


statement ok
CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'debezium_compact_avro_json',
kafka.brokers = 'message_queue:29092',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

statement ok
CREATE TABLE kafka_json_schema_plain with (
connector = 'kafka',
kafka.topic = 'kafka_json_schema',
kafka.brokers = 'message_queue:29092',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
kafka.scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (schema.registry = 'http://schemaregistry:8082');
) FORMAT PLAIN ENCODE JSON (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

statement ok
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key))
INCLUDE KEY AS rw_key
with (
connector = 'kafka',
kafka.topic = 'kafka_upsert_json_schema',
kafka.brokers = 'message_queue:29092',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
kafka.scan.startup.mode = 'earliest'
) FORMAT UPSERT ENCODE JSON (schema.registry = 'http://schemaregistry:8082');
) FORMAT UPSERT ENCODE JSON (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

statement ok
flush;
Expand Down
10 changes: 5 additions & 5 deletions e2e_test/source/basic/schema_registry.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ create source s1 () with (
topic = 'upsert_avro_json-record',
properties.bootstrap.server = 'message_queue:29092'
) format plain encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'no sense',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
);
Expand All @@ -17,7 +17,7 @@ create source s1 () with (
topic = 'upsert_avro_json-record',
properties.bootstrap.server = 'message_queue:29092'
) format plain encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
key.message = 'string'
Expand All @@ -29,7 +29,7 @@ create source s1 () with (
topic = 'upsert_avro_json-record',
properties.bootstrap.server = 'message_queue:29092'
) format plain encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
);
Expand All @@ -41,7 +41,7 @@ create table t1 () with (
topic = 'upsert_avro_json-topic-record',
properties.bootstrap.server = 'message_queue:29092'
) format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'topic_record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE'
);
Expand All @@ -54,7 +54,7 @@ with (
topic = 'upsert_avro_json-topic-record',
properties.bootstrap.server = 'message_queue:29092'
) format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'topic_record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
key.message = 'string'
Expand Down
26 changes: 23 additions & 3 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,10 @@ profile:
user-managed: true
address: message_queue
port: 29092
- use: schema-registry
user-managed: true
address: schemaregistry
port: 8082

ci-inline-source-test:
config-path: src/config/ci-recovery.toml
Expand Down Expand Up @@ -1431,9 +1435,8 @@ template:

# Listen port of KRaft controller
controller-port: 29093

# Listen address
listen-address: ${address}
Comment on lines -1435 to -1436
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listen-address is unused

# Listen port for other services in docker (schema-registry)
docker-port: 29094

# The docker image. Can be overridden to use a different version.
image: "confluentinc/cp-kafka:7.6.1"
Expand All @@ -1446,6 +1449,23 @@ template:

user-managed: false

schema-registry:
# Id to be picked-up by services
id: schema-registry-${port}

# Advertise address
address: "127.0.0.1"

# Listen port of Schema Registry
port: 8081

# The docker image. Can be overridden to use a different version.
image: "confluentinc/cp-schema-registry:7.6.1"

user-managed: false

provide-kafka: "kafka*"

# Google pubsub emulator service
pubsub:
id: pubsub-${port}
Expand Down
6 changes: 3 additions & 3 deletions scripts/source/prepare_ci_kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ for filename in $kafka_data_files; do
if [[ "$topic" = *bin ]]; then
kcat -P -b message_queue:29092 -t "$topic" "$filename"
elif [[ "$topic" = *avro_json ]]; then
python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" "$filename" "topic" "avro"
python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" "$filename" "topic" "avro"
elif [[ "$topic" = *json_schema ]]; then
python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" "$filename" "topic" "json"
else
Expand All @@ -72,9 +72,9 @@ for i in {0..100}; do echo "key$i:{\"a\": $i}" | kcat -P -b message_queue:29092
# write schema with name strategy

## topic: upsert_avro_json-record, key subject: string, value subject: CPLM.OBJ_ATTRIBUTE_VALUE
(python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" source/test_data/upsert_avro_json.1 "record" "avro") &
(python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" source/test_data/upsert_avro_json.1 "record" "avro") &
## topic: upsert_avro_json-topic-record,
## key subject: upsert_avro_json-topic-record-string
## value subject: upsert_avro_json-topic-record-CPLM.OBJ_ATTRIBUTE_VALUE
(python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" source/test_data/upsert_avro_json.1 "topic-record" "avro") &
(python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" source/test_data/upsert_avro_json.1 "topic-record" "avro") &
wait
Loading
Loading