diff --git a/e2e_test/source_inline/kafka/avro/name_strategy.slt b/e2e_test/source_inline/kafka/avro/name_strategy.slt index 4d6eb79952cdc..b9aaacef37db8 100644 --- a/e2e_test/source_inline/kafka/avro/name_strategy.slt +++ b/e2e_test/source_inline/kafka/avro/name_strategy.slt @@ -22,6 +22,8 @@ create source s1 () with ( ####################### # TODO: refactor the producer script and the test data format. +# Currently we are abusing this test case to also test data types. + system ok python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro" @@ -136,11 +138,12 @@ FROM ORDER BY "ID"; ---- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL -NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) +update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL NULL +delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL NULL +delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL NULL +delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL +NULL id6 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) NULL +NULL id7 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL NULL 67e55044-10b1-426f-9247-bb680e5fe0c8 # query II # SELECT @@ -165,14 +168,15 @@ FROM ORDER BY "ID"; ---- -update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL -delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL -NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL -NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) +update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL NULL +update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL NULL +delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL NULL +delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL NULL +delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL +delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL +NULL id6 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) NULL +NULL id7 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL NULL 67e55044-10b1-426f-9247-bb680e5fe0c8 +NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL query II @@ -183,11 +187,12 @@ FROM ORDER BY "ID"; ---- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL -NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) +update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL NULL +delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL NULL +delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL NULL +delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL +NULL id6 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) NULL +NULL id7 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL NULL 67e55044-10b1-426f-9247-bb680e5fe0c8 diff --git a/e2e_test/source_inline/kafka/avro/upsert_avro_json b/e2e_test/source_inline/kafka/avro/upsert_avro_json index 0e966da1e13d9..426b27c41fb98 100644 --- a/e2e_test/source_inline/kafka/avro/upsert_avro_json +++ b/e2e_test/source_inline/kafka/avro/upsert_avro_json @@ -1,4 +1,4 @@ -"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"} +"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null},{"name":"uuid","type":["null",{"type":"string","logicalType":"uuid"}],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"} "id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}} "id2"^{"op_type": {"string": "delete"}, "ID": {"string": "id2"}, "CLASS_ID": {"string": "2"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}} "id3"^{"op_type": {"string": "delete"}, "ID": {"string": "id3"}, "CLASS_ID": {"string": "3"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u007f\u00ff\u00ff\u00ff"}} @@ -6,4 +6,5 @@ "id5"^{"op_type": {"string": "delete"}, "ID": {"string": "id5"}, "CLASS_ID": {"string": "5"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}} "id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "-1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0080\u0000\u0000\u0001"}} "id4"^ -"id100"^{"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}} +"id6"^{"ID": {"string": "id6"},"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}} +"id7"^{"ID": {"string": "id7"},"uuid": {"string": "67e55044-10b1-426f-9247-bb680e5fe0c8"}} diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 3bfdc3fed8646..4bb6d8a4a4bbd 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -218,7 +218,8 @@ fn avro_type_mapping( } } } - Schema::Null | Schema::Fixed(_) | Schema::Uuid => { + Schema::Uuid => DataType::Varchar, + Schema::Null | Schema::Fixed(_) => { bail!("unsupported Avro type: {:?}", schema) } }; diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 5d8be78ab9ea3..bd01a40380381 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -280,6 +280,9 @@ impl<'a> AvroParseOptions<'a> { debug_assert!(jsonb.as_ref().is_object()); JsonbVal::from(jsonb).into() } + (DataType::Varchar, Value::Uuid(uuid)) => { + uuid.as_hyphenated().to_string().into_boxed_str().into() + } (_expected, _got) => Err(create_error())?, };