diff --git a/Cargo.lock b/Cargo.lock index f6563c51b6d1a..1dd12d838d589 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,7 +182,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.16.0" -source = "git+https://github.com/risingwavelabs/avro?rev=d0846a16ce813a225af04ade35b3b8117b137a29#d0846a16ce813a225af04ade35b3b8117b137a29" +source = "git+https://github.com/risingwavelabs/avro?rev=f33bf6a9d1734d1e23edeb374dc48d26db4b18a5#f33bf6a9d1734d1e23edeb374dc48d26db4b18a5" dependencies = [ "bzip2", "crc32fast", diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index 3e251009df1dd..bc4a88e65ee71 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -76,16 +76,16 @@ sleep 8s query II SELECT - op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME" + op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME", "DEC_VAL" FROM upsert_avro_json_default_key ORDER BY "ID"; ---- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z +update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 +delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 +delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 +delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 # query II # SELECT diff --git a/scripts/source/schema_registry_producer.py b/scripts/source/schema_registry_producer.py index f2a58fe1628ed..79a3d4db1b40f 100644 --- a/scripts/source/schema_registry_producer.py +++ b/scripts/source/schema_registry_producer.py @@ -24,6 +24,20 @@ def delivery_report(err, msg): return +def load_avro_json(encoded, schema): + """Unlike `json.loads`, this decodes a json according to given avro schema. + + https://avro.apache.org/docs/1.11.1/specification/#json-encoding + Specially, it handles `union` variants, and differentiates `bytes` from `string`. + """ + from fastavro import json_reader + from io import StringIO + + with StringIO(encoded) as buf: + reader = json_reader(buf, schema) + return next(reader) + + if __name__ == '__main__': if len(sys.argv) < 5: print("datagen.py ") @@ -59,6 +73,8 @@ def delivery_report(err, msg): producer = Producer(kafka_conf) key_serializer = None value_serializer = None + key_schema = None + value_schema = None with open(file) as file: for (i, line) in enumerate(file): if i == 0: @@ -71,6 +87,8 @@ def delivery_report(err, msg): value_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=parts[1], conf=avro_ser_conf) + key_schema = json.loads(parts[0]) + value_schema = json.loads(parts[1]) else: key_serializer = JSONSerializer(schema_registry_client=schema_registry_client, schema_str=parts[0]) @@ -80,28 +98,47 @@ def delivery_report(err, msg): if type == 'avro': value_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=parts[0]) + value_schema = json.loads(parts[0]) else: value_serializer = JSONSerializer(schema_registry_client=schema_registry_client, schema_str=parts[0]) else: parts = line.split("^") + key_idx = None + value_idx = None + if len(parts) > 1: + key_idx = 0 + if len(parts[1].strip()) > 0: + value_idx = 1 + else: + value_idx = 0 + if type == 'avro': + if key_idx is not None: + key_json = load_avro_json(parts[key_idx], key_schema) + if value_idx is not None: + value_json = load_avro_json(parts[value_idx], value_schema) + else: + if key_idx is not None: + key_json = json.loads(parts[key_idx]) + if value_idx is not None: + value_json = json.loads(parts[value_idx]) if len(parts) > 1: if len(parts[1].strip()) > 0: producer.produce(topic=topic, partition=0, - key=key_serializer(json.loads(parts[0]), + key=key_serializer(key_json, SerializationContext(topic, MessageField.KEY)), value=value_serializer( - json.loads(parts[1]), SerializationContext(topic, MessageField.VALUE)), + value_json, SerializationContext(topic, MessageField.VALUE)), on_delivery=delivery_report) else: producer.produce(topic=topic, partition=0, - key=key_serializer(json.loads(parts[0]), + key=key_serializer(key_json, SerializationContext(topic, MessageField.KEY)), on_delivery=delivery_report) else: producer.produce(topic=topic, partition=0, value=value_serializer( - json.loads(parts[0]), SerializationContext(topic, MessageField.VALUE)), + value_json, SerializationContext(topic, MessageField.VALUE)), on_delivery=delivery_report) producer.flush() diff --git a/scripts/source/test_data/debezium_compact_avro_json.1 b/scripts/source/test_data/debezium_compact_avro_json.1 index 48ee296088b23..54e25c147247c 100644 --- a/scripts/source/test_data/debezium_compact_avro_json.1 +++ b/scripts/source/test_data/debezium_compact_avro_json.1 @@ -1,6 +1,6 @@ {"type":"record","name":"Key","namespace":"postgres.public.orders","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0}],"connect.name":"postgres.public.orders.Key"}^{"type":"record","name":"Envelope","namespace":"postgres.public.orders","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0},{"name":"order_date","type":["null","long"],"default":null},{"name":"customer_name","type":["null","string"],"default":null},{"name":"price","type":["null",{"type":"record","name":"VariableScaleDecimal","namespace":"io.debezium.data","fields":[{"name":"scale","type":"int"},{"name":"value","type":"bytes"}],"connect.doc":"Variable scaled decimal","connect.version":1,"connect.name":"io.debezium.data.VariableScaleDecimal"}],"default":null},{"name":"product_id","type":["null","int"],"default":null},{"name":"order_status","type":["null",{"type":"int","connect.type":"int16"}],"default":null}],"connect.name":"postgres.public.orders.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","long"],"default":null},{"name":"lsn","type":["null","long"],"default":null},{"name":"xmin","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.postgresql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"postgres.public.orders.Envelope"} -{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","price":null,"product_id":1,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424096,"transaction":null} -{"order_id": 2}^{"before":null,"after":{"order_id":2,"order_date":1558430840001,"customer_name":"Alice","price":null,"product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424102,"transaction":null} -{"order_id": 3}^{"before":null,"after":{"order_id":3,"order_date":1558430840002,"customer_name":"Alice","price":null,"product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"last","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424103,"transaction":null} -{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","price":null,"product_id":3,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029203809,"snapshot":"false","db":"mydb","sequence":"[null,\"23911952\"]","schema":"public","table":"orders","txId":491,"lsn":23911952,"xmin":null},"op":"u","ts_ms":1686029204058,"transaction":null} -{"order_id": 1}^ \ No newline at end of file +{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 1}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424096}, "transaction": null} +{"order_id": 2}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 2, "order_date": {"long": 1558430840001}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424102}, "transaction": null} +{"order_id": 3}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 3, "order_date": {"long": 1558430840002}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "last"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424103}, "transaction": null} +{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 3}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1686029203809, "snapshot": {"string": "false"}, "db": "mydb", "sequence": {"string": "[null,\"23911952\"]"}, "schema": "public", "table": "orders", "txId": {"long": 491}, "lsn": {"long": 23911952}, "xmin": null}, "op": "u", "ts_ms": {"long": 1686029204058}, "transaction": null} +{"order_id": 1}^ diff --git a/scripts/source/test_data/debezium_non_compact_avro_json.1 b/scripts/source/test_data/debezium_non_compact_avro_json.1 index 02343769e5a2c..919b37989ce5c 100644 --- a/scripts/source/test_data/debezium_non_compact_avro_json.1 +++ b/scripts/source/test_data/debezium_non_compact_avro_json.1 @@ -1,7 +1,7 @@ {"type":"record","name":"Key","namespace":"postgres.public.orders","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0}],"connect.name":"postgres.public.orders.Key"}^{"type":"record","name":"Envelope","namespace":"postgres.public.orders","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0},{"name":"order_date","type":["null","long"],"default":null},{"name":"customer_name","type":["null","string"],"default":null},{"name":"price","type":["null",{"type":"record","name":"VariableScaleDecimal","namespace":"io.debezium.data","fields":[{"name":"scale","type":"int"},{"name":"value","type":"bytes"}],"connect.doc":"Variable scaled decimal","connect.version":1,"connect.name":"io.debezium.data.VariableScaleDecimal"}],"default":null},{"name":"product_id","type":["null","int"],"default":null},{"name":"order_status","type":["null",{"type":"int","connect.type":"int16"}],"default":null}],"connect.name":"postgres.public.orders.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","long"],"default":null},{"name":"lsn","type":["null","long"],"default":null},{"name":"xmin","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.postgresql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"postgres.public.orders.Envelope"} -{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","product_id":1,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424096,"transaction":null} -{"order_id": 2}^{"before":null,"after":{"order_id":2,"order_date":1558430840001,"customer_name":"Alice","product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424102,"transaction":null} -{"order_id": 3}^{"before":null,"after":{"order_id":3,"order_date":1558430840002,"customer_name":"Alice","product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"last","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424103,"transaction":null} -{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","product_id":3,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029203809,"snapshot":"false","db":"mydb","sequence":"[null,\"23911952\"]","schema":"public","table":"orders","txId":491,"lsn":23911952,"xmin":null},"op":"u","ts_ms":1686029204058,"transaction":null} -{"order_id": 1}^{"before":{"order_id":1,"order_date":null,"customer_name":null,"price":null,"product_id":null,"order_status":null},"after":null,"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029268569,"snapshot":"false","db":"mydb","sequence":"[\"23912408\",\"23912488\"]","schema":"public","table":"orders","txId":492,"lsn":23912488,"xmin":null},"op":"d","ts_ms":1686029268858,"transaction":null} -{"order_id": 1}^ \ No newline at end of file +{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 1}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424096}, "transaction": null} +{"order_id": 2}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 2, "order_date": {"long": 1558430840001}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424102}, "transaction": null} +{"order_id": 3}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 3, "order_date": {"long": 1558430840002}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "last"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424103}, "transaction": null} +{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 3}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1686029203809, "snapshot": {"string": "false"}, "db": "mydb", "sequence": {"string": "[null,\"23911952\"]"}, "schema": "public", "table": "orders", "txId": {"long": 491}, "lsn": {"long": 23911952}, "xmin": null}, "op": "u", "ts_ms": {"long": 1686029204058}, "transaction": null} +{"order_id": 1}^{"before": {"postgres.public.orders.Value": {"order_id": 1, "order_date": null, "customer_name": null, "price": null, "product_id": null, "order_status": null}}, "after": null, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1686029268569, "snapshot": {"string": "false"}, "db": "mydb", "sequence": {"string": "[\"23912408\",\"23912488\"]"}, "schema": "public", "table": "orders", "txId": {"long": 492}, "lsn": {"long": 23912488}, "xmin": null}, "op": "d", "ts_ms": {"long": 1686029268858}, "transaction": null} +{"order_id": 1}^ diff --git a/scripts/source/test_data/upsert_avro_json.1 b/scripts/source/test_data/upsert_avro_json.1 index 25cc744720d4c..46191e2d0001b 100644 --- a/scripts/source/test_data/upsert_avro_json.1 +++ b/scripts/source/test_data/upsert_avro_json.1 @@ -1,8 +1,8 @@ -"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"], "default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"} -"id1"^{"op_type":"update","ID":"id1","CLASS_ID":"1","ITEM_ID":"6768","ATTR_ID":"6970","ATTR_VALUE":"value9","ORG_ID":"7172","UNIT_INFO":"info9","UPD_TIME":"2021-05-18T07:59:58.714Z"} -"id2"^{"op_type":"delete","ID":"id2","CLASS_ID":"2","ITEM_ID":"7778","ATTR_ID":"7980","ATTR_VALUE":"value10","ORG_ID":"8182","UNIT_INFO":"info10","UPD_TIME":"2021-05-19T15:22:45.539Z"} -"id3"^{"op_type":"delete","ID":"id3","CLASS_ID":"3","ITEM_ID":"7778","ATTR_ID":"7980","ATTR_VALUE":"value10","ORG_ID":"8182","UNIT_INFO":"info10","UPD_TIME":"2021-05-19T15:22:45.539Z"} -"id4"^{"op_type":"delete","ID":"id4","CLASS_ID":"4","ITEM_ID":"7778","ATTR_ID":"7980","ATTR_VALUE":"value10","ORG_ID":"8182","UNIT_INFO":"info10","UPD_TIME":"2021-05-19T15:22:45.539Z"} -"id5"^{"op_type":"delete","ID":"id5","CLASS_ID":"5","ITEM_ID":"7778","ATTR_ID":"7980","ATTR_VALUE":"value10","ORG_ID":"8182","UNIT_INFO":"info10","UPD_TIME":"2021-05-19T15:22:45.539Z"} -"id1"^{"op_type":"update","ID":"id1","CLASS_ID":"-1","ITEM_ID":"6768","ATTR_ID":"6970","ATTR_VALUE":"value9","ORG_ID":"7172","UNIT_INFO":"info9","UPD_TIME":"2021-05-18T07:59:58.714Z"} -"id4"^ \ No newline at end of file +"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"], "default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":["null",{"type":"bytes","logicalType":"decimal","precision":10,"scale":2}],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"} +"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}} +"id2"^{"op_type": {"string": "delete"}, "ID": {"string": "id2"}, "CLASS_ID": {"string": "2"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}} +"id3"^{"op_type": {"string": "delete"}, "ID": {"string": "id3"}, "CLASS_ID": {"string": "3"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u007f\u00ff\u00ff\u00ff"}} +"id4"^{"op_type": {"string": "delete"}, "ID": {"string": "id4"}, "CLASS_ID": {"string": "4"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}} +"id5"^{"op_type": {"string": "delete"}, "ID": {"string": "id5"}, "CLASS_ID": {"string": "5"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}} +"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "-1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0080\u0000\u0000\u0001"}} +"id4"^ diff --git a/scripts/source/test_data/upsert_student_avro_json.1 b/scripts/source/test_data/upsert_student_avro_json.1 index b57b4d716164b..f81242ad3290c 100644 --- a/scripts/source/test_data/upsert_student_avro_json.1 +++ b/scripts/source/test_data/upsert_student_avro_json.1 @@ -1,16 +1,16 @@ {"type":"record","name":"Person","fields":[{"name":"ID","type":"int"}]}^{"type":"record","name":"Person","fields":[{"name":"ID","type":"int"},{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"height","type":"double"},{"name":"weight","type":"double"}]} -{"ID": 1}^{"ID": 1, "firstName": "John", "lastName": "Doe", "age": 18, "height": 5.10, "weight": 150} -{"ID": 2}^{"ID": 2, "firstName": "Sarah", "lastName": "Smith", "age": 19, "height": 5.5, "weight": 120} -{"ID": 3}^{"ID": 3, "firstName": "Ben", "lastName": "Johnson", "age": 21, "height": 6.0, "weight": 175} -{"ID": 4}^{"ID": 4, "firstName": "Emma", "lastName": "Brown", "age": 20, "height": 5.3, "weight": 130} -{"ID": 5}^{"ID": 5, "firstName": "Michael", "lastName": "Williams", "age": 22, "height": 6.2, "weight": 190} -{"ID": 6}^{"ID": 6, "firstName": "Leah", "lastName": "Davis", "age": 18, "height": 5.7, "weight": 140} -{"ID": 7}^{"ID": 7, "firstName": "Connor", "lastName": "Wilson", "age": 19, "height": 5.9, "weight": 160} -{"ID": 8}^{"ID": 8, "firstName": "Ava", "lastName": "Garcia", "age": 21, "height": 5.2, "weight": 115} -{"ID": 9}^{"ID": 9, "firstName": "Jacob", "lastName": "Anderson", "age": 20, "height": 5.8, "weight": 155} -{"ID": 1}^{"ID": 1, "firstName": "Olivia", "lastName": "Hernandez", "age": 22, "height": 5.6, "weight": 125} -{"ID": 1}^{"ID": 1, "firstName": "Ethan", "lastName": "Martinez", "age": 18, "height": 6.1, "weight": 180} -{"ID": 2}^{"ID": 2, "firstName": "Emily", "lastName": "Jackson", "age": 19, "height": 5.4, "weight": 110} -{"ID": 3}^{"ID": 3, "firstName": "Noah", "lastName": "Thompson", "age": 21, "height": 6.3, "weight": 195} +{"ID": 1}^{"ID": 1, "firstName": "John", "lastName": "Doe", "age": 18, "height": 5.1, "weight": 150.0} +{"ID": 2}^{"ID": 2, "firstName": "Sarah", "lastName": "Smith", "age": 19, "height": 5.5, "weight": 120.0} +{"ID": 3}^{"ID": 3, "firstName": "Ben", "lastName": "Johnson", "age": 21, "height": 6.0, "weight": 175.0} +{"ID": 4}^{"ID": 4, "firstName": "Emma", "lastName": "Brown", "age": 20, "height": 5.3, "weight": 130.0} +{"ID": 5}^{"ID": 5, "firstName": "Michael", "lastName": "Williams", "age": 22, "height": 6.2, "weight": 190.0} +{"ID": 6}^{"ID": 6, "firstName": "Leah", "lastName": "Davis", "age": 18, "height": 5.7, "weight": 140.0} +{"ID": 7}^{"ID": 7, "firstName": "Connor", "lastName": "Wilson", "age": 19, "height": 5.9, "weight": 160.0} +{"ID": 8}^{"ID": 8, "firstName": "Ava", "lastName": "Garcia", "age": 21, "height": 5.2, "weight": 115.0} +{"ID": 9}^{"ID": 9, "firstName": "Jacob", "lastName": "Anderson", "age": 20, "height": 5.8, "weight": 155.0} +{"ID": 1}^{"ID": 1, "firstName": "Olivia", "lastName": "Hernandez", "age": 22, "height": 5.6, "weight": 125.0} +{"ID": 1}^{"ID": 1, "firstName": "Ethan", "lastName": "Martinez", "age": 18, "height": 6.1, "weight": 180.0} +{"ID": 2}^{"ID": 2, "firstName": "Emily", "lastName": "Jackson", "age": 19, "height": 5.4, "weight": 110.0} +{"ID": 3}^{"ID": 3, "firstName": "Noah", "lastName": "Thompson", "age": 21, "height": 6.3, "weight": 195.0} {"ID": 7}^ -{"ID": 8}^ \ No newline at end of file +{"ID": 8}^ diff --git a/scripts/source/test_data/upsert_student_key_not_subset_of_value_avro_json.1 b/scripts/source/test_data/upsert_student_key_not_subset_of_value_avro_json.1 index 48c099799a510..779de1452437a 100644 --- a/scripts/source/test_data/upsert_student_key_not_subset_of_value_avro_json.1 +++ b/scripts/source/test_data/upsert_student_key_not_subset_of_value_avro_json.1 @@ -1,16 +1,16 @@ {"type":"record","name":"Person","fields":[{"name":"NOTEXIST","type":"int"}]}^{"type":"record","name":"Person","fields":[{"name":"ID","type":"int"},{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"height","type":"double"},{"name":"weight","type":"double"}]} -{"NOTEXIST": 1}^{"ID": 1, "firstName": "John", "lastName": "Doe", "age": 18, "height": 5.10, "weight": 150} -{"NOTEXIST": 2}^{"ID": 2, "firstName": "Sarah", "lastName": "Smith", "age": 19, "height": 5.5, "weight": 120} -{"NOTEXIST": 3}^{"ID": 3, "firstName": "Ben", "lastName": "Johnson", "age": 21, "height": 6.0, "weight": 175} -{"NOTEXIST": 4}^{"ID": 4, "firstName": "Emma", "lastName": "Brown", "age": 20, "height": 5.3, "weight": 130} -{"NOTEXIST": 5}^{"ID": 5, "firstName": "Michael", "lastName": "Williams", "age": 22, "height": 6.2, "weight": 190} -{"NOTEXIST": 6}^{"ID": 6, "firstName": "Leah", "lastName": "Davis", "age": 18, "height": 5.7, "weight": 140} -{"NOTEXIST": 7}^{"ID": 7, "firstName": "Connor", "lastName": "Wilson", "age": 19, "height": 5.9, "weight": 160} -{"NOTEXIST": 8}^{"ID": 8, "firstName": "Ava", "lastName": "Garcia", "age": 21, "height": 5.2, "weight": 115} -{"NOTEXIST": 9}^{"ID": 9, "firstName": "Jacob", "lastName": "Anderson", "age": 20, "height": 5.8, "weight": 155} -{"NOTEXIST": 1}^{"ID": 1, "firstName": "Olivia", "lastName": "Hernandez", "age": 22, "height": 5.6, "weight": 125} -{"NOTEXIST": 1}^{"ID": 1, "firstName": "Ethan", "lastName": "Martinez", "age": 18, "height": 6.1, "weight": 180} -{"NOTEXIST": 2}^{"ID": 2, "firstName": "Emily", "lastName": "Jackson", "age": 19, "height": 5.4, "weight": 110} -{"NOTEXIST": 3}^{"ID": 3, "firstName": "Noah", "lastName": "Thompson", "age": 21, "height": 6.3, "weight": 195} +{"NOTEXIST": 1}^{"ID": 1, "firstName": "John", "lastName": "Doe", "age": 18, "height": 5.1, "weight": 150.0} +{"NOTEXIST": 2}^{"ID": 2, "firstName": "Sarah", "lastName": "Smith", "age": 19, "height": 5.5, "weight": 120.0} +{"NOTEXIST": 3}^{"ID": 3, "firstName": "Ben", "lastName": "Johnson", "age": 21, "height": 6.0, "weight": 175.0} +{"NOTEXIST": 4}^{"ID": 4, "firstName": "Emma", "lastName": "Brown", "age": 20, "height": 5.3, "weight": 130.0} +{"NOTEXIST": 5}^{"ID": 5, "firstName": "Michael", "lastName": "Williams", "age": 22, "height": 6.2, "weight": 190.0} +{"NOTEXIST": 6}^{"ID": 6, "firstName": "Leah", "lastName": "Davis", "age": 18, "height": 5.7, "weight": 140.0} +{"NOTEXIST": 7}^{"ID": 7, "firstName": "Connor", "lastName": "Wilson", "age": 19, "height": 5.9, "weight": 160.0} +{"NOTEXIST": 8}^{"ID": 8, "firstName": "Ava", "lastName": "Garcia", "age": 21, "height": 5.2, "weight": 115.0} +{"NOTEXIST": 9}^{"ID": 9, "firstName": "Jacob", "lastName": "Anderson", "age": 20, "height": 5.8, "weight": 155.0} +{"NOTEXIST": 1}^{"ID": 1, "firstName": "Olivia", "lastName": "Hernandez", "age": 22, "height": 5.6, "weight": 125.0} +{"NOTEXIST": 1}^{"ID": 1, "firstName": "Ethan", "lastName": "Martinez", "age": 18, "height": 6.1, "weight": 180.0} +{"NOTEXIST": 2}^{"ID": 2, "firstName": "Emily", "lastName": "Jackson", "age": 19, "height": 5.4, "weight": 110.0} +{"NOTEXIST": 3}^{"ID": 3, "firstName": "Noah", "lastName": "Thompson", "age": 21, "height": 6.3, "weight": 195.0} {"NOTEXIST": 7}^ -{"NOTEXIST": 8}^ \ No newline at end of file +{"NOTEXIST": 8}^ diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 555fd2bf4bea8..0a38e601a8492 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -15,7 +15,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16ce813a225af04ade35b3b8117b137a29", features = [ +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "f33bf6a9d1734d1e23edeb374dc48d26db4b18a5", features = [ "snappy", "zstandard", "bzip",