From ce79e72e161085a3434950aa07e2b51dbe735c5a Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Wed, 12 Jun 2024 16:02:24 +0800 Subject: [PATCH] e2e tests --- e2e_test/sink/kafka/avro.slt | 109 +++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index 1cf27b811d9be..d9c5272ddb9cb 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -42,8 +42,117 @@ create sink sink0 from into_kafka with ( format upsert encode avro ( schema.registry = 'http://schemaregistry:8082'); +system ok +rpk topic create test-rw-sink-plain-avro + +system ok +jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions' +{ + "type": "record", + "name": "Simple", + "fields": [ + { + "name": "int32_field", + "type": ["null", "int"] + }, + { + "name": "string_field", + "type": ["null", "string"] + } + ] +} +EOF + +statement ok +create table from_kafka_plain +include key as raw_key +with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092') +format plain encode avro ( + schema.registry = 'http://schemaregistry:8082'); + +statement ok +create sink sink_plain_key_none as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092') +format upsert encode avro ( + schema.registry = 'http://schemaregistry:8082'); + +statement ok +create sink sink_plain_key_text as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field') +format upsert encode avro ( + schema.registry = 'http://schemaregistry:8082') +key encode text; + +system ok +jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions' +{ + "type": "record", + "name": "Key", + "fields": [ + { + "name": "int32_field", + "type": ["null", "int"] + } + ] +} +EOF + +statement ok +create sink sink_plain_key_avro as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field') +format upsert encode avro ( + schema.registry = 'http://schemaregistry:8082'); + sleep 2s +query TIT +select raw_key, int32_field, string_field from from_kafka_plain order by string_field, raw_key; +---- +\x00????????2c 22 Rising +\x3232 22 Rising +NULL 22 Rising +\x00????????16 11 Wave +\x3131 11 Wave +NULL 11 Wave + +statement ok +drop sink sink_plain_key_avro; + +statement ok +drop sink sink_plain_key_text; + +statement ok +drop sink sink_plain_key_none; + +statement ok +drop table from_kafka_plain; + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key?permanent=true' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value?permanent=true' + +system ok +rpk topic delete test-rw-sink-plain-avro + query TTTRRIITTTTTTTT select bool_field,