Skip to content

Commit

Permalink
e2e test sink kafka upsert avro schema registry
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Oct 25, 2023
1 parent 0842b86 commit 2a0d269
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 0 deletions.
8 changes: 8 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,11 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1

echo "testing avro"
python3 -m pip install requests confluent-kafka
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1
110 changes: 110 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
statement ok
create table from_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

statement ok
create table into_kafka (
bool_field bool,
string_field varchar,
bytes_field bytea,
float_field real,
double_field double precision,
int32_field int,
int64_field bigint,
record_field struct<id int, name varchar>,
array_field int[][],
timestamp_micros_field timestamptz,
timestamp_millis_field timestamptz,
date_field date,
time_micros_field time,
time_millis_field time);

statement ok
insert into into_kafka values
(true, 'Rising', 'a0', 3.5, 4.25, 22, 23, null, array[array[null, 3], null, array[7, null, 2]], '2006-01-02 15:04:05-07:00', null, null, '12:34:56.123456', null),
(false, 'Wave', 'ZDF', 1.5, null, 11, 12, row(null::int, 'foo'), null, null, '2006-01-02 15:04:05-07:00', '2021-04-01', null, '23:45:16.654321');

statement ok
flush;

statement ok
create sink sink0 from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

sleep 2s

query TTTRRIITTTTTTTT
select
bool_field,
string_field,
bytes_field,
float_field,
double_field,
int32_field,
int64_field,
record_field,
array_field,
timestamp_micros_field,
timestamp_millis_field,
date_field,
time_micros_field,
time_millis_field from from_kafka;
----
t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 12 (NULL,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654

statement error SchemaFetchError
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro-err',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

statement error encode extra_column error: field not in avro
create sink sink_err as select 1 as extra_column, * from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

statement error unrecognized
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry.name.strategy = 'typo');

statement error empty field key.message
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry.name.strategy = 'record_name_strategy');

statement ok
drop sink sink0;

statement ok
drop table into_kafka;

statement ok
drop table from_kafka;
9 changes: 9 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ format plain encode protobuf (
force_append_only = true,
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
drop sink sink0;

statement ok
drop table into_kafka;

statement ok
drop table from_kafka;
48 changes: 48 additions & 0 deletions e2e_test/sink/kafka/register_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import sys
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema


def main():
url = sys.argv[1]
subject = sys.argv[2]
with open(sys.argv[3]) as f:
schema_str = f.read()
if 4 < len(sys.argv):
keys = sys.argv[4].split(',')
else:
keys = []

client = SchemaRegistryClient({"url": url})

if keys:
schema_str = select_keys(schema_str, keys)
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
client.register_schema(subject, schema)


def select_fields(schema_str, f):
import json
root = json.loads(schema_str)
if not isinstance(root, dict):
return schema_str
if root['type'] != 'record':
return schema_str
root['fields'] = f(root['fields'])
return json.dumps(root)


def remove_unsupported(schema_str):
return select_fields(schema_str, lambda fields: [f for f in fields if f['name'] not in {'unsupported', 'mon_day_sec_field'}])


def select_keys(schema_str, keys):
def process(fields):
by_name = {f['name']: f for f in fields}
return [by_name[k] for k in keys]
return select_fields(schema_str, process)


if __name__ == '__main__':
main()
69 changes: 69 additions & 0 deletions src/connector/src/test_data/all-types.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"type": "record",
"name": "AllTypes",
"fields": [
{"name": "bool_field", "type": ["null", "boolean"]},
{"name": "string_field", "type": ["null", "string"]},
{"name": "bytes_field", "type": ["null", "bytes"]},
{"name": "float_field", "type": ["null", "float"]},
{"name": "double_field", "type": ["null", "double"]},
{"name": "int32_field", "type": ["null", "int"]},
{"name": "int64_field", "type": ["null", "long"]},
{"name": "record_field", "type": ["null", {
"type": "record",
"name": "Nested",
"fields": [
{"name": "id", "type": ["null", "int"]},
{"name": "name", "type": ["null", "string"]}
]
}]},
{"name": "array_field", "type": ["null", {
"type": "array",
"items": ["null", {
"type": "array",
"items": ["null", "int"]
}]
}]},
{"name": "timestamp_micros_field", "type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]},
{"name": "timestamp_millis_field", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}]},
{"name": "date_field", "type": ["null", {"type": "int", "logicalType": "date"}]},
{"name": "time_micros_field", "type": ["null", {"type": "long", "logicalType": "time-micros"}]},
{"name": "time_millis_field", "type": ["null", {"type": "int", "logicalType": "time-millis"}]},
{"name": "mon_day_sec_field", "type": ["null", {
"type": "fixed",
"name": "Duration",
"size": 12,
"logicalType": "duration"
}]},
{"name": "unsupported", "type": ["null", {
"type": "record",
"name": "Unsupported",
"fields": [
{"name": "enum_field", "type": ["null", {
"type": "enum",
"name": "Suit",
"symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}]},
{"name": "map_field", "type": ["null", {
"type": "map",
"values": ["null", "string"]
}]},
{"name": "union_field", "type": ["null", "string", "double", "boolean"]},
{"name": "fixed_field", "type": ["null", {
"type": "fixed",
"name": "Int256",
"size": 32
}]},
{"name": "decimal_field", "type": ["null", {
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 10
}]},
{"name": "uuid_field", "type": ["null", {"type": "string", "logicalType": "uuid"}]},
{"name": "local_micros_field", "type": ["null", {"type": "long", "logicalType": "local-timestamp-micros"}]},
{"name": "local_millis_field", "type": ["null", {"type": "long", "logicalType": "local-timestamp-millis"}]}
]
}]}
]
}

0 comments on commit 2a0d269

Please sign in to comment.