From 2a0d269bf943b561ccad988284bfef47f7a61f3c Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Tue, 24 Oct 2023 21:08:19 +0800 Subject: [PATCH] e2e test sink kafka upsert avro schema registry --- ci/scripts/e2e-kafka-sink-test.sh | 8 ++ e2e_test/sink/kafka/avro.slt | 110 +++++++++++++++++++++ e2e_test/sink/kafka/protobuf.slt | 9 ++ e2e_test/sink/kafka/register_schema.py | 48 +++++++++ src/connector/src/test_data/all-types.avsc | 69 +++++++++++++ 5 files changed, 244 insertions(+) create mode 100644 e2e_test/sink/kafka/avro.slt create mode 100644 e2e_test/sink/kafka/register_schema.py create mode 100644 src/connector/src/test_data/all-types.avsc diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 71a91f2d8fba9..d51482a912235 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -138,3 +138,11 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 + +echo "testing avro" +python3 -m pip install requests confluent-kafka +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1 +sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1 diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt new file mode 100644 index 0000000000000..e1b09e3608e37 --- /dev/null +++ b/e2e_test/sink/kafka/avro.slt @@ -0,0 +1,110 @@ +statement ok +create table from_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +statement ok +create table into_kafka ( + bool_field bool, + string_field varchar, + bytes_field bytea, + float_field real, + double_field double precision, + int32_field int, + int64_field bigint, + record_field struct, + array_field int[][], + timestamp_micros_field timestamptz, + timestamp_millis_field timestamptz, + date_field date, + time_micros_field time, + time_millis_field time); + +statement ok +insert into into_kafka values + (true, 'Rising', 'a0', 3.5, 4.25, 22, 23, null, array[array[null, 3], null, array[7, null, 2]], '2006-01-02 15:04:05-07:00', null, null, '12:34:56.123456', null), + (false, 'Wave', 'ZDF', 1.5, null, 11, 12, row(null::int, 'foo'), null, null, '2006-01-02 15:04:05-07:00', '2021-04-01', null, '23:45:16.654321'); + +statement ok +flush; + +statement ok +create sink sink0 from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +sleep 2s + +query TTTRRIITTTTTTTT +select + bool_field, + string_field, + bytes_field, + float_field, + double_field, + int32_field, + int64_field, + record_field, + array_field, + timestamp_micros_field, + timestamp_millis_field, + date_field, + time_micros_field, + time_millis_field from from_kafka; +---- +t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL +f Wave \x5a4446 1.5 NULL 11 12 (NULL,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654 + +statement error SchemaFetchError +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro-err', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +statement error encode extra_column error: field not in avro +create sink sink_err as select 1 as extra_column, * from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +statement error unrecognized +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081', + schema.registry.name.strategy = 'typo'); + +statement error empty field key.message +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081', + schema.registry.name.strategy = 'record_name_strategy'); + +statement ok +drop sink sink0; + +statement ok +drop table into_kafka; + +statement ok +drop table from_kafka; diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 87ab884eddbde..2f827aeda9fc0 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -95,3 +95,12 @@ format plain encode protobuf ( force_append_only = true, schema.location = 's3:///risingwave/proto-recursive', message = 'recursive.AllTypes'); + +statement ok +drop sink sink0; + +statement ok +drop table into_kafka; + +statement ok +drop table from_kafka; diff --git a/e2e_test/sink/kafka/register_schema.py b/e2e_test/sink/kafka/register_schema.py new file mode 100644 index 0000000000000..2606e07bcb89b --- /dev/null +++ b/e2e_test/sink/kafka/register_schema.py @@ -0,0 +1,48 @@ +import sys +from confluent_kafka.schema_registry import SchemaRegistryClient, Schema + + +def main(): + url = sys.argv[1] + subject = sys.argv[2] + with open(sys.argv[3]) as f: + schema_str = f.read() + if 4 < len(sys.argv): + keys = sys.argv[4].split(',') + else: + keys = [] + + client = SchemaRegistryClient({"url": url}) + + if keys: + schema_str = select_keys(schema_str, keys) + else: + schema_str = remove_unsupported(schema_str) + schema = Schema(schema_str, 'AVRO') + client.register_schema(subject, schema) + + +def select_fields(schema_str, f): + import json + root = json.loads(schema_str) + if not isinstance(root, dict): + return schema_str + if root['type'] != 'record': + return schema_str + root['fields'] = f(root['fields']) + return json.dumps(root) + + +def remove_unsupported(schema_str): + return select_fields(schema_str, lambda fields: [f for f in fields if f['name'] not in {'unsupported', 'mon_day_sec_field'}]) + + +def select_keys(schema_str, keys): + def process(fields): + by_name = {f['name']: f for f in fields} + return [by_name[k] for k in keys] + return select_fields(schema_str, process) + + +if __name__ == '__main__': + main() diff --git a/src/connector/src/test_data/all-types.avsc b/src/connector/src/test_data/all-types.avsc new file mode 100644 index 0000000000000..3fea69bbef4ca --- /dev/null +++ b/src/connector/src/test_data/all-types.avsc @@ -0,0 +1,69 @@ +{ + "type": "record", + "name": "AllTypes", + "fields": [ + {"name": "bool_field", "type": ["null", "boolean"]}, + {"name": "string_field", "type": ["null", "string"]}, + {"name": "bytes_field", "type": ["null", "bytes"]}, + {"name": "float_field", "type": ["null", "float"]}, + {"name": "double_field", "type": ["null", "double"]}, + {"name": "int32_field", "type": ["null", "int"]}, + {"name": "int64_field", "type": ["null", "long"]}, + {"name": "record_field", "type": ["null", { + "type": "record", + "name": "Nested", + "fields": [ + {"name": "id", "type": ["null", "int"]}, + {"name": "name", "type": ["null", "string"]} + ] + }]}, + {"name": "array_field", "type": ["null", { + "type": "array", + "items": ["null", { + "type": "array", + "items": ["null", "int"] + }] + }]}, + {"name": "timestamp_micros_field", "type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}, + {"name": "timestamp_millis_field", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}]}, + {"name": "date_field", "type": ["null", {"type": "int", "logicalType": "date"}]}, + {"name": "time_micros_field", "type": ["null", {"type": "long", "logicalType": "time-micros"}]}, + {"name": "time_millis_field", "type": ["null", {"type": "int", "logicalType": "time-millis"}]}, + {"name": "mon_day_sec_field", "type": ["null", { + "type": "fixed", + "name": "Duration", + "size": 12, + "logicalType": "duration" + }]}, + {"name": "unsupported", "type": ["null", { + "type": "record", + "name": "Unsupported", + "fields": [ + {"name": "enum_field", "type": ["null", { + "type": "enum", + "name": "Suit", + "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + }]}, + {"name": "map_field", "type": ["null", { + "type": "map", + "values": ["null", "string"] + }]}, + {"name": "union_field", "type": ["null", "string", "double", "boolean"]}, + {"name": "fixed_field", "type": ["null", { + "type": "fixed", + "name": "Int256", + "size": 32 + }]}, + {"name": "decimal_field", "type": ["null", { + "type": "bytes", + "logicalType": "decimal", + "precision": 38, + "scale": 10 + }]}, + {"name": "uuid_field", "type": ["null", {"type": "string", "logicalType": "uuid"}]}, + {"name": "local_micros_field", "type": ["null", {"type": "long", "logicalType": "local-timestamp-micros"}]}, + {"name": "local_millis_field", "type": ["null", {"type": "long", "logicalType": "local-timestamp-millis"}]} + ] + }]} + ] +}