From 5c33a8d71cb5182cdd2414969f4c6e27cbb8f7e2 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 24 May 2024 18:15:55 +0800 Subject: [PATCH] correctly handle null value Signed-off-by: Bugen Zhao --- ci/scripts/e2e-kafka-sink-test.sh | 18 +++++++++--------- e2e_test/sink/kafka/debezium.py | 14 ++++++-------- e2e_test/sink/kafka/debezium3.result | 2 +- e2e_test/sink/kafka/upsert3.result | 2 +- e2e_test/sink/kafka/upsert_schema3.result | 2 +- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 14bf1b2711936..206ce4ba1d75d 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -17,7 +17,7 @@ sleep 2 # test append-only kafka sink echo "testing append-only kafka sink" -diff ./e2e_test/sink/kafka/append_only1.result \ +diff -b ./e2e_test/sink/kafka/append_only1.result \ <((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink is not as expected." @@ -26,7 +26,7 @@ fi # test upsert kafka sink echo "testing upsert kafka sink" -diff ./e2e_test/sink/kafka/upsert1.result \ +diff -b ./e2e_test/sink/kafka/upsert1.result \ <((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink is not as expected." @@ -35,7 +35,7 @@ fi # test upsert kafka sink with schema echo "testing upsert kafka sink with schema" -diff ./e2e_test/sink/kafka/upsert_schema1.result \ +diff -b ./e2e_test/sink/kafka/upsert_schema1.result \ <((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." @@ -60,7 +60,7 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '', # test append-only kafka sink after update echo "testing append-only kafka sink after updating data" -diff ./e2e_test/sink/kafka/append_only2.result \ +diff -b ./e2e_test/sink/kafka/append_only2.result \ <((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink after update is not as expected." @@ -69,7 +69,7 @@ fi # test upsert kafka sink after update echo "testing upsert kafka sink after updating data" -diff ./e2e_test/sink/kafka/upsert2.result \ +diff -b ./e2e_test/sink/kafka/upsert2.result \ <((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." @@ -78,7 +78,7 @@ fi # test upsert kafka sink with schema after update echo "testing upsert kafka sink with schema after updating data" -diff ./e2e_test/sink/kafka/upsert_schema2.result \ +diff -b ./e2e_test/sink/kafka/upsert_schema2.result \ <((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." @@ -99,7 +99,7 @@ fi # test without-snapshot kafka sink echo "testing without-snapshot kafka sink" -diff ./e2e_test/sink/kafka/without_snapshot.result \ +diff -b ./e2e_test/sink/kafka/without_snapshot.result \ <((rpk topic consume test-rw-sink-without-snapshot --offset start --format '%v\n' --num 3 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink is not as expected." @@ -112,7 +112,7 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;" # test upsert kafka sink after delete echo "testing upsert kafka sink after deleting data" -diff ./e2e_test/sink/kafka/upsert3.result \ +diff -b ./e2e_test/sink/kafka/upsert3.result \ <((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." @@ -121,7 +121,7 @@ fi # test upsert kafka sink with schema after delete echo "testing upsert kafka sink with schema after deleting data" -diff ./e2e_test/sink/kafka/upsert_schema3.result \ +diff -b ./e2e_test/sink/kafka/upsert_schema3.result \ <((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." diff --git a/e2e_test/sink/kafka/debezium.py b/e2e_test/sink/kafka/debezium.py index 4ec5e31281d88..5dbac1ae80216 100644 --- a/e2e_test/sink/kafka/debezium.py +++ b/e2e_test/sink/kafka/debezium.py @@ -11,9 +11,9 @@ # debezium sink sends k/v pair kv = line.split() key = json.loads(kv[0]) - # kafka consumer outputs string "null" for null payload - if kv[1] == "null": - value = kv[1] + # rpk output nothing for null payload + if len(kv) == 1: + value = None else: value = json.loads(kv[1]) # The `ts_ms` field may vary, so we delete it from the json object @@ -26,12 +26,10 @@ with open(test_output_file) as file: for line in file: kv = line.split() - if len(kv) != 2: - print(line) - assert(len(kv) == 2) key = json.loads(kv[0]) - if kv[1] == "null": - value = kv[1] + # rpk output nothing for null payload + if len(kv) == 1: + value = None else: value = json.loads(kv[1]) # Assert `ts_ms` is an integer here. diff --git a/e2e_test/sink/kafka/debezium3.result b/e2e_test/sink/kafka/debezium3.result index 196037c88b33b..b2d0235c2cfd2 100644 --- a/e2e_test/sink/kafka/debezium3.result +++ b/e2e_test/sink/kafka/debezium3.result @@ -1,5 +1,5 @@ {"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":0,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01T00:00:00.123456Z","v_varchar":"0oVqRIHqkb"},"before":null,"op":"c","source":{"db":"dev","table":"t_kafka","ts_ms":1700210120041},"ts_ms":1700210120041},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} -{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} null +{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":null,"before":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":-719162,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"op":"d","source":{"db":"dev","table":"t_kafka","ts_ms":1700210132903},"ts_ms":1700210132903},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":-719162,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"before":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":-719162,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"op":"u","source":{"db":"dev","table":"t_kafka","ts_ms":1700210127905},"ts_ms":1700210127905},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":-719162,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"before":null,"op":"c","source":{"db":"dev","table":"t_kafka","ts_ms":1700210120041},"ts_ms":1700210120041},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} diff --git a/e2e_test/sink/kafka/upsert3.result b/e2e_test/sink/kafka/upsert3.result index dde36e5f444eb..fe845aff27301 100644 --- a/e2e_test/sink/kafka/upsert3.result +++ b/e2e_test/sink/kafka/upsert3.result @@ -1,5 +1,5 @@ {"id":10} {"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01 00:00:00.123456","v_varchar":"0oVqRIHqkb"} -{"id":1} null +{"id":1} {"id":1} {"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":1,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":""} {"id":1} {"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":1,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"8DfUFencLe"} {"id":2} {"id":2,"v_bigint":4598,"v_bytea":"AA==","v_date":719163,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_interval":"P0Y0M0DT4H0M0S","v_jsonb":"{}","v_smallint":22690,"v_time":1000,"v_timestamp":1681429444869,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"sIo1XXVeHZ"} diff --git a/e2e_test/sink/kafka/upsert_schema3.result b/e2e_test/sink/kafka/upsert_schema3.result index 18675dcbb76d3..ccbb7fe542b37 100644 --- a/e2e_test/sink/kafka/upsert_schema3.result +++ b/e2e_test/sink/kafka/upsert_schema3.result @@ -1,5 +1,5 @@ {"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01T00:00:00.123456Z","v_varchar":"0oVqRIHqkb"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} -{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} null +{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":1,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":1,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":2},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":2,"v_bigint":4598,"v_bytea":"AA==","v_date":719163,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_interval":"P0Y0M0DT4H0M0S","v_jsonb":"{}","v_smallint":22690,"v_time":1000,"v_timestamp":1681429444869,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"sIo1XXVeHZ"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}