Skip to content

Commit

Permalink
correctly handle null value
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed May 24, 2024
1 parent aeb7450 commit 5c33a8d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 20 deletions.
18 changes: 9 additions & 9 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sleep 2

# test append-only kafka sink
echo "testing append-only kafka sink"
diff ./e2e_test/sink/kafka/append_only1.result \
diff -b ./e2e_test/sink/kafka/append_only1.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
Expand All @@ -26,7 +26,7 @@ fi

# test upsert kafka sink
echo "testing upsert kafka sink"
diff ./e2e_test/sink/kafka/upsert1.result \
diff -b ./e2e_test/sink/kafka/upsert1.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink is not as expected."
Expand All @@ -35,7 +35,7 @@ fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
diff -b ./e2e_test/sink/kafka/upsert_schema1.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
Expand All @@ -60,7 +60,7 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '',

# test append-only kafka sink after update
echo "testing append-only kafka sink after updating data"
diff ./e2e_test/sink/kafka/append_only2.result \
diff -b ./e2e_test/sink/kafka/append_only2.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink after update is not as expected."
Expand All @@ -69,7 +69,7 @@ fi

# test upsert kafka sink after update
echo "testing upsert kafka sink after updating data"
diff ./e2e_test/sink/kafka/upsert2.result \
diff -b ./e2e_test/sink/kafka/upsert2.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
Expand All @@ -78,7 +78,7 @@ fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
diff -b ./e2e_test/sink/kafka/upsert_schema2.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
Expand All @@ -99,7 +99,7 @@ fi

# test without-snapshot kafka sink
echo "testing without-snapshot kafka sink"
diff ./e2e_test/sink/kafka/without_snapshot.result \
diff -b ./e2e_test/sink/kafka/without_snapshot.result \
<((rpk topic consume test-rw-sink-without-snapshot --offset start --format '%v\n' --num 3 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
Expand All @@ -112,7 +112,7 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;"

# test upsert kafka sink after delete
echo "testing upsert kafka sink after deleting data"
diff ./e2e_test/sink/kafka/upsert3.result \
diff -b ./e2e_test/sink/kafka/upsert3.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
Expand All @@ -121,7 +121,7 @@ fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
diff -b ./e2e_test/sink/kafka/upsert_schema3.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
Expand Down
14 changes: 6 additions & 8 deletions e2e_test/sink/kafka/debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# debezium sink sends k/v pair
kv = line.split()
key = json.loads(kv[0])
# kafka consumer outputs string "null" for null payload
if kv[1] == "null":
value = kv[1]
# rpk output nothing for null payload
if len(kv) == 1:
value = None
else:
value = json.loads(kv[1])
# The `ts_ms` field may vary, so we delete it from the json object
Expand All @@ -26,12 +26,10 @@
with open(test_output_file) as file:
for line in file:
kv = line.split()
if len(kv) != 2:
print(line)
assert(len(kv) == 2)
key = json.loads(kv[0])
if kv[1] == "null":
value = kv[1]
# rpk output nothing for null payload
if len(kv) == 1:
value = None
else:
value = json.loads(kv[1])
# Assert `ts_ms` is an integer here.
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/debezium3.result
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":0,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01T00:00:00.123456Z","v_varchar":"0oVqRIHqkb"},"before":null,"op":"c","source":{"db":"dev","table":"t_kafka","ts_ms":1700210120041},"ts_ms":1700210120041},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}}
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} null
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}}
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":null,"before":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":-719162,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"op":"d","source":{"db":"dev","table":"t_kafka","ts_ms":1700210132903},"ts_ms":1700210132903},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}}
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":-719162,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"before":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":-719162,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"op":"u","source":{"db":"dev","table":"t_kafka","ts_ms":1700210127905},"ts_ms":1700210127905},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}}
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":-719162,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"before":null,"op":"c","source":{"db":"dev","table":"t_kafka","ts_ms":1700210120041},"ts_ms":1700210120041},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}}
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/upsert3.result
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"id":10} {"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01 00:00:00.123456","v_varchar":"0oVqRIHqkb"}
{"id":1} null
{"id":1}
{"id":1} {"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":1,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":""}
{"id":1} {"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":1,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"8DfUFencLe"}
{"id":2} {"id":2,"v_bigint":4598,"v_bytea":"AA==","v_date":719163,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_interval":"P0Y0M0DT4H0M0S","v_jsonb":"{}","v_smallint":22690,"v_time":1000,"v_timestamp":1681429444869,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"sIo1XXVeHZ"}
Expand Down
Loading

0 comments on commit 5c33a8d

Please sign in to comment.