Skip to content

Commit

Permalink
e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jun 12, 2024
1 parent 76afd87 commit ce79e72
Showing 1 changed file with 109 additions and 0 deletions.
109 changes: 109 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,117 @@ create sink sink0 from into_kafka with (
format upsert encode avro (
schema.registry = 'http://schemaregistry:8082');

system ok
rpk topic create test-rw-sink-plain-avro

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions'
{
"type": "record",
"name": "Simple",
"fields": [
{
"name": "int32_field",
"type": ["null", "int"]
},
{
"name": "string_field",
"type": ["null", "string"]
}
]
}
EOF

statement ok
create table from_kafka_plain
include key as raw_key
with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092')
format plain encode avro (
schema.registry = 'http://schemaregistry:8082');

statement ok
create sink sink_plain_key_none as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092')
format upsert encode avro (
schema.registry = 'http://schemaregistry:8082');

statement ok
create sink sink_plain_key_text as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field')
format upsert encode avro (
schema.registry = 'http://schemaregistry:8082')
key encode text;

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions'
{
"type": "record",
"name": "Key",
"fields": [
{
"name": "int32_field",
"type": ["null", "int"]
}
]
}
EOF

statement ok
create sink sink_plain_key_avro as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field')
format upsert encode avro (
schema.registry = 'http://schemaregistry:8082');

sleep 2s

query TIT
select raw_key, int32_field, string_field from from_kafka_plain order by string_field, raw_key;
----
\x00????????2c 22 Rising
\x3232 22 Rising
NULL 22 Rising
\x00????????16 11 Wave
\x3131 11 Wave
NULL 11 Wave

statement ok
drop sink sink_plain_key_avro;

statement ok
drop sink sink_plain_key_text;

statement ok
drop sink sink_plain_key_none;

statement ok
drop table from_kafka_plain;

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key?permanent=true'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value?permanent=true'

system ok
rpk topic delete test-rw-sink-plain-avro

query TTTRRIITTTTTTTT
select
bool_field,
Expand Down

0 comments on commit ce79e72

Please sign in to comment.