diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 919046bea586b..c4b4713af81cc 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -135,20 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt' echo "--- Kill cluster" risedev ci-kill - -echo "--- e2e, ci-1cn-1fe, protobuf schema registry" export RISINGWAVE_CI=true -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -risedev ci-start ci-1cn-1fe -echo "make sure google/protobuf/source_context.proto is NOT in schema registry" -curl --silent 'http://schemaregistry:8082/subjects'; echo -# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 -curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto' -risedev slt './e2e_test/schema_registry/pb.slt' -risedev slt './e2e_test/schema_registry/alter_sr.slt' - -echo "--- Kill cluster" -risedev ci-kill echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt deleted file mode 100644 index 7b60b4fa8d7a4..0000000000000 --- a/e2e_test/schema_registry/pb.slt +++ /dev/null @@ -1,50 +0,0 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py - -# Create a table. -statement ok -create table sr_pb_test with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' - ); - -# for multiple schema registry nodes -statement ok -create table sr_pb_test_bk with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082', - message = 'test.User' - ); - -# Wait for source -sleep 10s - -# Flush into storage -statement ok -flush; - -query I -select count(*) from sr_pb_test; ----- -20 - -query IIT -select min(id), max(id), max((sc).file_name) from sr_pb_test; ----- -0 19 source/context_019.proto - - -statement ok -drop table sr_pb_test; - -statement ok -drop table sr_pb_test_bk; diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/source_inline/kafka/protobuf/alter_source.slt similarity index 80% rename from e2e_test/schema_registry/alter_sr.slt rename to e2e_test/source_inline/kafka/protobuf/alter_source.slt index 051bebcd5ef32..c9db2df3ca4ee 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source.slt @@ -5,7 +5,7 @@ rpk topic delete sr_pb_test || true; \\ (rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; system ok -python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user statement ok CREATE SOURCE src_user @@ -42,7 +42,7 @@ SELECT age FROM t_user; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields sleep 5s @@ -69,7 +69,7 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields sleep 5s diff --git a/e2e_test/source_inline/kafka/protobuf/basic.slt b/e2e_test/source_inline/kafka/protobuf/basic.slt new file mode 100644 index 0000000000000..82eb61560aa4d --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/basic.slt @@ -0,0 +1,58 @@ +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user + +# make sure google/protobuf/source_context.proto is NOT in schema registry +system ok +curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto' + +# Create a table. +statement ok +create table sr_pb_test with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# for multiple schema registry nodes +statement ok +create table sr_pb_test_bk with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# Wait for source +sleep 2s + +# Flush into storage +statement ok +flush; + +query I +select count(*) from sr_pb_test; +---- +20 + +query IT +select min(id), max(id), max((sc).file_name) from sr_pb_test; +---- +0 19 source/context_019.proto + + +statement ok +drop table sr_pb_test; + +statement ok +drop table sr_pb_test_bk; diff --git a/e2e_test/schema_registry/pb.py b/e2e_test/source_inline/kafka/protobuf/pb.py similarity index 74% rename from e2e_test/schema_registry/pb.py rename to e2e_test/source_inline/kafka/protobuf/pb.py index f970353be56ae..4cab50f899e50 100644 --- a/e2e_test/schema_registry/pb.py +++ b/e2e_test/source_inline/kafka/protobuf/pb.py @@ -25,6 +25,7 @@ def get_user(i): sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), ) + def get_user_with_more_fields(i): return user_pb2.User( id=i, @@ -36,12 +37,15 @@ def get_user_with_more_fields(i): age=100 + i, ) -def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message): + +def send_to_kafka( + producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message +): schema_registry_client = SchemaRegistryClient(schema_registry_conf) serializer = ProtobufSerializer( pb_message, schema_registry_client, - {"use.deprecated.format": False, 'skip.known.types': True}, + {"use.deprecated.format": False, "skip.known.types": True}, ) producer = Producer(producer_conf) @@ -60,7 +64,9 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u if __name__ == "__main__": if len(sys.argv) < 6: - print("pb.py ") + print( + "pb.py " + ) exit(1) broker_list = sys.argv[1] @@ -69,20 +75,29 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u num_records = int(sys.argv[4]) pb_message = sys.argv[5] - user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2') + user_pb2 = importlib.import_module(f"{pb_message}_pb2") all_pb_messages = { - 'user': get_user, - 'user_with_more_fields': get_user_with_more_fields, + "user": get_user, + "user_with_more_fields": get_user_with_more_fields, } - assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}' + assert ( + pb_message in all_pb_messages + ), f"pb_message must be one of {list(all_pb_messages.keys())}" schema_registry_conf = {"url": schema_registry_url} producer_conf = {"bootstrap.servers": broker_list} try: - send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User) + send_to_kafka( + producer_conf, + schema_registry_conf, + topic, + num_records, + all_pb_messages[pb_message], + user_pb2.User, + ) except Exception as e: print("Send Protobuf data to schema registry and kafka failed {}", e) exit(1) diff --git a/e2e_test/schema_registry/protobuf/user.proto b/e2e_test/source_inline/kafka/protobuf/user.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user.proto rename to e2e_test/source_inline/kafka/protobuf/user.proto diff --git a/e2e_test/schema_registry/protobuf/user_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_pb2.py diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields.proto b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields.proto rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py