Skip to content

Commit

Permalink
move tests
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jun 19, 2024
1 parent 97e0a4d commit ef00148
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 74 deletions.
13 changes: 0 additions & 13 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt'

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-1cn-1fe, protobuf schema registry"
export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
echo "make sure google/protobuf/source_context.proto is NOT in schema registry"
curl --silent 'http://schemaregistry:8082/subjects'; echo
# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto'
risedev slt './e2e_test/schema_registry/pb.slt'
risedev slt './e2e_test/schema_registry/alter_sr.slt'

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
Expand Down
50 changes: 0 additions & 50 deletions e2e_test/schema_registry/pb.slt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ rpk topic delete sr_pb_test || true; \\
(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true;

system ok
python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user

statement ok
CREATE SOURCE src_user
Expand Down Expand Up @@ -42,7 +42,7 @@ SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields

sleep 5s

Expand All @@ -69,7 +69,7 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields

sleep 5s

Expand Down
58 changes: 58 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

system ok
rpk topic delete sr_pb_test || true; \\
(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user

# make sure google/protobuf/source_context.proto is NOT in schema registry
system
curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto'

# Create a table.
statement ok
create table sr_pb_test with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# for multiple schema registry nodes
statement ok
create table sr_pb_test_bk with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# Wait for source
sleep 2s

# Flush into storage
statement ok
flush;

query I
select count(*) from sr_pb_test;
----
20

query IT
select min(id), max(id), max((sc).file_name) from sr_pb_test;
----
0 19 source/context_019.proto


statement ok
drop table sr_pb_test;

statement ok
drop table sr_pb_test_bk;
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_user(i):
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
)


def get_user_with_more_fields(i):
return user_pb2.User(
id=i,
Expand All @@ -36,12 +37,15 @@ def get_user_with_more_fields(i):
age=100 + i,
)

def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message):

def send_to_kafka(
producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message
):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
pb_message,
schema_registry_client,
{"use.deprecated.format": False, 'skip.known.types': True},
{"use.deprecated.format": False, "skip.known.types": True},
)

producer = Producer(producer_conf)
Expand All @@ -60,7 +64,9 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u

if __name__ == "__main__":
if len(sys.argv) < 6:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>")
print(
"pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>"
)
exit(1)

broker_list = sys.argv[1]
Expand All @@ -69,20 +75,29 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
num_records = int(sys.argv[4])
pb_message = sys.argv[5]

user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2')
user_pb2 = importlib.import_module(f"{pb_message}_pb2")

all_pb_messages = {
'user': get_user,
'user_with_more_fields': get_user_with_more_fields,
"user": get_user,
"user_with_more_fields": get_user_with_more_fields,
}

assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}'
assert (
pb_message in all_pb_messages
), f"pb_message must be one of {list(all_pb_messages.keys())}"

schema_registry_conf = {"url": schema_registry_url}
producer_conf = {"bootstrap.servers": broker_list}

try:
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User)
send_to_kafka(
producer_conf,
schema_registry_conf,
topic,
num_records,
all_pb_messages[pb_message],
user_pb2.User,
)
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
exit(1)

0 comments on commit ef00148

Please sign in to comment.