Skip to content

Commit

Permalink
feat(source): support avro uuid type
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 3, 2024
1 parent fa49955 commit 07ce8e3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
41 changes: 23 additions & 18 deletions e2e_test/source_inline/kafka/avro/name_strategy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ create source s1 () with (
#######################

# TODO: refactor the producer script and the test data format.
# Currently we are abusing this test case to also test data types.

system ok
python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro"

Expand Down Expand Up @@ -136,11 +138,12 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL
NULL id6 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) NULL
NULL id7 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL NULL 67e55044-10b1-426f-9247-bb680e5fe0c8

# query II
# SELECT
Expand All @@ -165,14 +168,15 @@ FROM
ORDER BY
"ID";
----
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL NULL
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL NULL
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL
NULL id6 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) NULL
NULL id7 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL NULL 67e55044-10b1-426f-9247-bb680e5fe0c8
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL


query II
Expand All @@ -183,11 +187,12 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL NULL
NULL id6 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg) NULL
NULL id7 NULL NULL NULL NULL NULL NULL NULL -0.01 NULL NULL 67e55044-10b1-426f-9247-bb680e5fe0c8



Expand Down
5 changes: 3 additions & 2 deletions e2e_test/source_inline/kafka/avro/upsert_avro_json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null},{"name":"uuid","type":["null",{"type":"string","logicalType":"uuid"}],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id2"^{"op_type": {"string": "delete"}, "ID": {"string": "id2"}, "CLASS_ID": {"string": "2"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id3"^{"op_type": {"string": "delete"}, "ID": {"string": "id3"}, "CLASS_ID": {"string": "3"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u007f\u00ff\u00ff\u00ff"}}
"id4"^{"op_type": {"string": "delete"}, "ID": {"string": "id4"}, "CLASS_ID": {"string": "4"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id5"^{"op_type": {"string": "delete"}, "ID": {"string": "id5"}, "CLASS_ID": {"string": "5"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "-1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0080\u0000\u0000\u0001"}}
"id4"^
"id100"^{"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}}
"id6"^{"ID": {"string": "id6"},"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}}
"id7"^{"ID": {"string": "id7"},"uuid": {"string": "67e55044-10b1-426f-9247-bb680e5fe0c8"}}
3 changes: 2 additions & 1 deletion src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ fn avro_type_mapping(
}
}
}
Schema::Null | Schema::Fixed(_) | Schema::Uuid => {
Schema::Uuid => DataType::Varchar,
Schema::Null | Schema::Fixed(_) => {
bail!("unsupported Avro type: {:?}", schema)
}
};
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ impl<'a> AvroParseOptions<'a> {
debug_assert!(jsonb.as_ref().is_object());
JsonbVal::from(jsonb).into()
}
(DataType::Varchar, Value::Uuid(uuid)) => {
uuid.as_hyphenated().to_string().into_boxed_str().into()
}

(_expected, _got) => Err(create_error())?,
};
Expand Down

0 comments on commit 07ce8e3

Please sign in to comment.