diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 5125ddedfdf8e..eaee563c7a992 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -45,7 +45,7 @@ mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/my echo "--- preparing postgresql" # set up PG sink destination -apt-get -y install postgresql-client +apt-get -y install postgresql-client jq export PGPASSWORD=postgres psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" createdb -h db -U postgres test diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index 1cf27b811d9be..9bb9e3395a34e 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -42,8 +42,126 @@ create sink sink0 from into_kafka with ( format upsert encode avro ( schema.registry = 'http://schemaregistry:8082'); +system ok +rpk topic create test-rw-sink-plain-avro + +system ok +jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions' +{ + "type": "record", + "name": "Simple", + "fields": [ + { + "name": "int32_field", + "type": ["null", "int"] + }, + { + "name": "string_field", + "type": ["null", "string"] + } + ] +} +EOF + +statement ok +create table from_kafka_plain +include key as raw_key +with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092') +format plain encode avro ( + schema.registry = 'http://schemaregistry:8082'); + +statement ok +create sink sink_plain_key_none as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092') +format plain encode avro ( + force_append_only = true, + schema.registry = 'http://schemaregistry:8082'); + +statement ok +create sink sink_plain_key_text as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field') +format plain encode avro ( + force_append_only = true, + schema.registry = 'http://schemaregistry:8082') +key encode text; + +system ok +jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions' +{ + "type": "record", + "name": "Key", + "fields": [ + { + "name": "int32_field", + "type": ["null", "int"] + } + ] +} +EOF + +statement ok +create sink sink_plain_key_avro as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field') +format plain encode avro ( + force_append_only = true, + schema.registry = 'http://schemaregistry:8082'); + sleep 2s +query ITT +select + int32_field, + string_field, + case when octet_length(raw_key) > 5 and substring(raw_key from 1 for 1) = '\x00' + then substring(raw_key from 6) -- skip indeterministic confluent schema registry header + else raw_key end +from from_kafka_plain order by string_field, raw_key; +---- +22 Rising \x022c +22 Rising \x3232 +22 Rising NULL +11 Wave \x0216 +11 Wave \x3131 +11 Wave NULL + +statement ok +drop sink sink_plain_key_avro; + +statement ok +drop sink sink_plain_key_text; + +statement ok +drop sink sink_plain_key_none; + +statement ok +drop table from_kafka_plain; + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key?permanent=true' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value?permanent=true' + +system ok +rpk topic delete test-rw-sink-plain-avro + query TTTRRIITTTTTTTT select bool_field,