Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): kafka upsert sink with schema #12113

Merged
merged 23 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
Expand All @@ -28,6 +29,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
Expand Down Expand Up @@ -62,6 +72,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
Expand All @@ -87,6 +106,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
Expand Down
11 changes: 11 additions & 0 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ create sink si_kafka_upsert from t_kafka with (
primary_key = 'id',
);

statement ok
create sink si_kafka_upsert_schema from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'test-rw-sink-upsert-schema',
type = 'upsert',
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
primary_key = 'id',
) format upsert encode json (
schema.enable = true
);

statement ok
create sink si_kafka_debezium from t_kafka with (
connector = 'kafka',
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/drop_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ drop sink si_kafka_upsert;
statement ok
drop sink si_kafka_debezium;

statement ok
drop sink si_kafka_upsert_schema;

statement ok
drop table t_kafka;
10 changes: 10 additions & 0 deletions e2e_test/sink/kafka/upsert_schema1.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":10,"v_bigint":20674,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_smallint":26951,"v_timestamp":1681404058888,"v_varchar":"0oVqRIHqkb"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":1,"v_bigint":1872,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_smallint":31031,"v_timestamp":1681453634104,"v_varchar":"8DfUFencLe"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":2},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":2,"v_bigint":4598,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_smallint":22690,"v_timestamp":1681429444869,"v_varchar":"sIo1XXVeHZ"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":3},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":3,"v_bigint":14894,"v_double":9742.475509566086,"v_float":2660.290283203125,"v_integer":5894,"v_smallint":5985,"v_timestamp":1681429011269,"v_varchar":"LVLAhd1pQv"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":4},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":4,"v_bigint":24962,"v_double":3119.719721891862,"v_float":21217.77734375,"v_integer":7406,"v_smallint":6306,"v_timestamp":1681434727993,"v_varchar":"ORjwy3oMNb"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":5},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":5,"v_bigint":9253,"v_double":17464.91553421121,"v_float":22749.5,"v_integer":9253,"v_smallint":22765,"v_timestamp":1681444642324,"v_varchar":"sSkKswxrYd"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":6},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":6,"v_bigint":28842,"v_double":11210.458724794062,"v_float":5885.3681640625,"v_integer":10844,"v_smallint":4014,"v_timestamp":1681382522137,"v_varchar":"V4y71v4Gip"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":7},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":7,"v_bigint":15914,"v_double":10967.182297153104,"v_float":3946.743408203125,"v_integer":12652,"v_smallint":10324,"v_timestamp":1681447263083,"v_varchar":"YIVLnWxHyf"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":8},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":8,"v_bigint":28641,"v_double":993.408963466774,"v_float":13652.0732421875,"v_integer":19036,"v_smallint":194,"v_timestamp":1681393929356,"v_varchar":"lv7Eq3g8hx"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
{"payload":{"id":9},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"}],"name":"dev.t_kafka","optional":"false","type":"struct"}} {"payload":{"id":9,"v_bigint":24837,"v_double":11615.276406159757,"v_float":20699.55859375,"v_integer":20090,"v_smallint":10028,"v_timestamp":1681389642487,"v_varchar":"nwRq4zejSQ"},"schema":{"fields":[{"field":"id","optional":"true","type":"int32"},{"field":"v_varchar","optional":"true","type":"string"},{"field":"v_smallint","optional":"true","type":"int16"},{"field":"v_integer","optional":"true","type":"int32"},{"field":"v_bigint","optional":"true","type":"int64"},{"field":"v_float","optional":"true","type":"float"},{"field":"v_double","optional":"true","type":"double"},{"field":"v_timestamp","optional":"true","type":"int64"}],"name":"dev.t_kafka","optional":"false","type":"struct"}}
Loading
Loading