Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Sep 13, 2024
1 parent 2812517 commit 8cc9a55
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 5 deletions.
12 changes: 8 additions & 4 deletions src/connector/codec/src/decoder/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ pub fn from_protobuf_value<'a>(
let DataType::Map(map_type) = type_expected else {
return Err(err());
};
let map_desc = kind.as_message().ok_or_else(err)?;
if !map_desc.is_map_entry() {
if !field_desc.is_map() {
return Err(err());
}
let map_desc = kind.as_message().ok_or_else(err)?;

let mut key_builder = map_type.key().create_array_builder(map.len());
let mut value_builder = map_type.value().create_array_builder(map.len());
Expand All @@ -205,11 +205,15 @@ pub fn from_protobuf_value<'a>(
// in the future.
for (key, value) in map.iter().sorted_by_key(|(k, _v)| *k) {
key_builder.append(from_protobuf_value(
field_desc,
&map_desc.map_entry_key_field(),
&key.clone().into(),
map_type.key(),
)?);
value_builder.append(from_protobuf_value(field_desc, value, map_type.value())?);
value_builder.append(from_protobuf_value(
&map_desc.map_entry_value_field(),
value,
map_type.value(),
)?);
}
let keys = key_builder.finish();
let values = value_builder.finish();
Expand Down
53 changes: 52 additions & 1 deletion src/connector/codec/tests/integration_tests/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,26 @@ fn test_all_types() -> anyhow::Result<()> {
int32_value_field: Some(42),
string_value_field: Some("Hello, Wrapper!".to_string()),
example_oneof: Some(ExampleOneof::OneofInt32(123)),
map_struct_field: HashMap::from_iter([
(
"key1".to_string(),
NestedMessage {
id: 1,
name: "A".to_string(),
},
),
(
"key2".to_string(),
NestedMessage {
id: 2,
name: "B".to_string(),
},
),
]),
map_enum_field: HashMap::from_iter([
(1, EnumType::Option1 as i32),
(2, EnumType::Option2 as i32),
]),
}
};
let mut data_bytes = Vec::new();
Expand Down Expand Up @@ -583,6 +603,11 @@ fn test_all_types() -> anyhow::Result<()> {
any_field(#35): Jsonb, type_name: google.protobuf.Any, field_descs: [type_url(#33): Varchar, value(#34): Bytea],
int32_value_field(#37): Struct { value: Int32 }, type_name: google.protobuf.Int32Value, field_descs: [value(#36): Int32],
string_value_field(#39): Struct { value: Varchar }, type_name: google.protobuf.StringValue, field_descs: [value(#38): Varchar],
map_struct_field(#44): Map(Varchar,Struct { id: Int32, name: Varchar }), type_name: all_types.AllTypes.MapStructFieldEntry, field_descs: [key(#40): Varchar, value(#43): Struct {
id: Int32,
name: Varchar,
}, type_name: all_types.AllTypes.NestedMessage, field_descs: [id(#41): Int32, name(#42): Varchar]],
map_enum_field(#47): Map(Int32,Varchar), type_name: all_types.AllTypes.MapEnumFieldEntry, field_descs: [key(#45): Int32, value(#46): Varchar],
]"#]],
expect![[r#"
Owned(Float64(OrderedFloat(1.2345)))
Expand Down Expand Up @@ -641,7 +666,33 @@ fn test_all_types() -> anyhow::Result<()> {
Error at column `any_field`: Fail to convert protobuf Any into jsonb: message 'my_custom_type' not found
~~~~
Owned(StructValue(Int32(42)))
Owned(StructValue(Utf8("Hello, Wrapper!")))"#]],
Owned(StructValue(Utf8("Hello, Wrapper!")))
Owned([
StructValue(
Utf8("key1"),
StructValue(
Int32(1),
Utf8("A"),
),
),
StructValue(
Utf8("key2"),
StructValue(
Int32(2),
Utf8("B"),
),
),
])
Owned([
StructValue(
Int32(1),
Utf8("OPTION1"),
),
StructValue(
Int32(2),
Utf8("OPTION2"),
),
])"#]],
);

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions src/connector/codec/tests/test_data/all-types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,7 @@ message AllTypes {
// wrapper types
google.protobuf.Int32Value int32_value_field = 27;
google.protobuf.StringValue string_value_field = 28;

map<string, NestedMessage> map_struct_field = 29;
map<int32, EnumType> map_enum_field = 30;
}

0 comments on commit 8cc9a55

Please sign in to comment.