From 3f3a678cc5dcf586b74517f3a83016fc1a34c642 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 17 Sep 2024 11:07:23 +0800 Subject: [PATCH] feat(sink): support sink map to protobuf --- .../codec/tests/test_data/all-types.pb | Bin 2748 -> 3230 bytes src/connector/src/sink/encoder/proto.rs | 549 +++++++++++++++--- 2 files changed, 464 insertions(+), 85 deletions(-) diff --git a/src/connector/codec/tests/test_data/all-types.pb b/src/connector/codec/tests/test_data/all-types.pb index 177976d5244add199f5144fbce200152114733b4..7892bbbca1ca1f92580064e4131883d61d3a55d8 100644 GIT binary patch delta 446 zcmdlZI!|)LM;4~l{F6Vk%w+1|+uX+*&n#fa#hIH}5TBNrnv)_SrocS;AG?IOZ(@NP zNWe9(q^L581E>JZ-Ta2bpRryjkV^ohrMRT1G`R$-Q&xdlgWE`5FEJ-4zNE4swOG$F zCnp5V02&qyR*GaKAJ70KK_PE0kVt%LUTH4WKsm61s+b1K6zc&^bOowJGLHwS8zeN@ zjLWKC%9@K8s01M`B*4X-omwfusKBVf83bYqiEy!&CFYc-N-zP%Sc4!OD<%s@sT3|@ zpk^e;fdc`H5$4E7a2qMYoI06{L&^rpz*sIp3<}E%w$@|wYiTqo_Vt_ryt{FX>QZWK|DSH@%szp diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index 88fb445b1c3ba..eaea0fa69c7ce 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -19,7 +19,7 @@ use prost_reflect::{ }; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; -use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType}; +use risingwave_common::types::{DataType, DatumRef, MapType, ScalarRefImpl, StructType}; use risingwave_common::util::iter_util::ZipEqDebug; use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo}; @@ -183,6 +183,8 @@ trait MaybeData: std::fmt::Debug { fn on_struct(self, st: &StructType, pb: &MessageDescriptor) -> Result; fn on_list(self, elem: &DataType, pb: &FieldDescriptor) -> Result; + + fn on_map(self, m: &MapType, pb: &MessageDescriptor) -> Result; } impl MaybeData for () { @@ -197,7 +199,14 @@ impl MaybeData for () { } fn on_list(self, elem: &DataType, pb: &FieldDescriptor) -> Result { - encode_field(elem, (), pb, true) + on_field(elem, (), pb, true) + } + + fn on_map(self, elem: &MapType, pb: &MessageDescriptor) -> Result { + debug_assert!(pb.is_map_entry()); + on_field(elem.key(), (), &pb.map_entry_key_field(), false)?; + on_field(elem.value(), (), &pb.map_entry_value_field(), false)?; + Ok(()) } } @@ -225,7 +234,7 @@ impl MaybeData for ScalarRefImpl<'_> { let vs = d .iter() .map(|d| { - encode_field( + on_field( elem, d.ok_or_else(|| { FieldEncodeError::new("array containing null not allowed as repeated field") @@ -237,6 +246,27 @@ impl MaybeData for ScalarRefImpl<'_> { .try_collect()?; Ok(Value::List(vs)) } + + fn on_map(self, m: &MapType, pb: &MessageDescriptor) -> Result { + debug_assert!(pb.is_map_entry()); + let vs = self + .into_map() + .iter() + .map(|(k, v)| { + let v = + v.ok_or_else(|| FieldEncodeError::new("map containing null not allowed"))?; + let k = on_field(m.key(), k, &pb.map_entry_key_field(), false)?; + let v = on_field(m.value(), v, &pb.map_entry_value_field(), false)?; + Ok(( + k.into_map_key().ok_or_else(|| { + FieldEncodeError::new("failed to convert map key to proto") + })?, + v, + )) + }) + .try_collect()?; + Ok(Value::Map(vs)) + } } fn validate_fields<'a>( @@ -250,7 +280,7 @@ fn validate_fields<'a>( if proto_field.cardinality() == prost_reflect::Cardinality::Required { return Err(FieldEncodeError::new("`required` not supported").with_name(name)); } - encode_field(t, (), &proto_field, false).map_err(|e| e.with_name(name))?; + on_field(t, (), &proto_field, false).map_err(|e| e.with_name(name))?; } Ok(()) } @@ -264,8 +294,7 @@ fn encode_fields<'a>( let proto_field = descriptor.get_field_by_name(name).unwrap(); // On `null`, simply skip setting the field. if let Some(scalar) = d { - let value = - encode_field(t, scalar, &proto_field, false).map_err(|e| e.with_name(name))?; + let value = on_field(t, scalar, &proto_field, false).map_err(|e| e.with_name(name))?; message .try_set_field(&proto_field, value) .map_err(|e| FieldEncodeError::new(e).with_name(name))?; @@ -281,7 +310,7 @@ const WKT_BOOL_VALUE: &str = "google.protobuf.BoolValue"; /// Handles both `validate` (without actual data) and `encode`. /// See [`MaybeData`] for more info. -fn encode_field( +fn on_field( data_type: &DataType, maybe: D, proto_field: &FieldDescriptor, @@ -295,10 +324,8 @@ fn encode_field( // In the bottom 2 cases, we need to distinguish the same `proto_field` with the help of `in_repeated`. assert!(proto_field.is_list() || !in_repeated); let expect_list = proto_field.is_list() && !in_repeated; - if proto_field.is_map() || proto_field.is_group() { - return Err(FieldEncodeError::new( - "proto map or group not supported yet", - )); + if proto_field.is_group() { + return Err(FieldEncodeError::new("proto group not supported yet")); } let no_match_err = || { @@ -309,44 +336,49 @@ fn encode_field( proto_field.kind() ))) }; + + if expect_list && !matches!(data_type, DataType::List(_)) { + return no_match_err(); + } + let value = match &data_type { // Group A: perfect match between RisingWave types and ProtoBuf types - DataType::Boolean => match (expect_list, proto_field.kind()) { - (false, Kind::Bool) => maybe.on_base(|s| Ok(Value::Bool(s.into_bool())))?, + DataType::Boolean => match proto_field.kind() { + Kind::Bool => maybe.on_base(|s| Ok(Value::Bool(s.into_bool())))?, _ => return no_match_err(), }, - DataType::Varchar => match (expect_list, proto_field.kind()) { - (false, Kind::String) => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?, + DataType::Varchar => match proto_field.kind() { + Kind::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?, _ => return no_match_err(), }, - DataType::Bytea => match (expect_list, proto_field.kind()) { - (false, Kind::Bytes) => { + DataType::Bytea => match proto_field.kind() { + Kind::Bytes => { maybe.on_base(|s| Ok(Value::Bytes(Bytes::copy_from_slice(s.into_bytea()))))? } _ => return no_match_err(), }, - DataType::Float32 => match (expect_list, proto_field.kind()) { - (false, Kind::Float) => maybe.on_base(|s| Ok(Value::F32(s.into_float32().into())))?, + DataType::Float32 => match proto_field.kind() { + Kind::Float => maybe.on_base(|s| Ok(Value::F32(s.into_float32().into())))?, _ => return no_match_err(), }, - DataType::Float64 => match (expect_list, proto_field.kind()) { - (false, Kind::Double) => maybe.on_base(|s| Ok(Value::F64(s.into_float64().into())))?, + DataType::Float64 => match proto_field.kind() { + Kind::Double => maybe.on_base(|s| Ok(Value::F64(s.into_float64().into())))?, _ => return no_match_err(), }, - DataType::Int32 => match (expect_list, proto_field.kind()) { - (false, Kind::Int32 | Kind::Sint32 | Kind::Sfixed32) => { + DataType::Int32 => match proto_field.kind() { + Kind::Int32 | Kind::Sint32 | Kind::Sfixed32 => { maybe.on_base(|s| Ok(Value::I32(s.into_int32())))? } _ => return no_match_err(), }, - DataType::Int64 => match (expect_list, proto_field.kind()) { - (false, Kind::Int64 | Kind::Sint64 | Kind::Sfixed64) => { + DataType::Int64 => match proto_field.kind() { + Kind::Int64 | Kind::Sint64 | Kind::Sfixed64 => { maybe.on_base(|s| Ok(Value::I64(s.into_int64())))? } _ => return no_match_err(), }, - DataType::Struct(st) => match (expect_list, proto_field.kind()) { - (false, Kind::Message(pb)) => maybe.on_struct(st, &pb)?, + DataType::Struct(st) => match proto_field.kind() { + Kind::Message(pb) => maybe.on_struct(st, &pb)?, _ => return no_match_err(), }, DataType::List(elem) => match expect_list { @@ -354,75 +386,70 @@ fn encode_field( false => return no_match_err(), }, // Group B: match between RisingWave types and ProtoBuf Well-Known types - DataType::Timestamptz => match (expect_list, proto_field.kind()) { - (false, Kind::Message(pb)) if pb.full_name() == WKT_TIMESTAMP => { - maybe.on_base(|s| { - let d = s.into_timestamptz(); - let message = prost_types::Timestamp { - seconds: d.timestamp(), - nanos: d.timestamp_subsec_nanos().try_into().unwrap(), - }; - Ok(Value::Message(message.transcode_to_dynamic())) - })? - } - (false, Kind::String) => { + DataType::Timestamptz => match proto_field.kind() { + Kind::Message(pb) if pb.full_name() == WKT_TIMESTAMP => maybe.on_base(|s| { + let d = s.into_timestamptz(); + let message = prost_types::Timestamp { + seconds: d.timestamp(), + nanos: d.timestamp_subsec_nanos().try_into().unwrap(), + }; + Ok(Value::Message(message.transcode_to_dynamic())) + })?, + Kind::String => { maybe.on_base(|s| Ok(Value::String(s.into_timestamptz().to_string())))? } _ => return no_match_err(), }, - DataType::Jsonb => match (expect_list, proto_field.kind()) { - (false, Kind::String) => { - maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))? - } + DataType::Jsonb => match proto_field.kind() { + Kind::String => maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?, _ => return no_match_err(), /* Value, NullValue, Struct (map), ListValue * Group C: experimental */ }, - DataType::Int16 => match (expect_list, proto_field.kind()) { - (false, Kind::Int64) => maybe.on_base(|s| Ok(Value::I64(s.into_int16() as i64)))?, + DataType::Int16 => match proto_field.kind() { + Kind::Int64 => maybe.on_base(|s| Ok(Value::I64(s.into_int16() as i64)))?, _ => return no_match_err(), }, - DataType::Date => match (expect_list, proto_field.kind()) { - (false, Kind::Int32) => { + DataType::Date => match proto_field.kind() { + Kind::Int32 => { maybe.on_base(|s| Ok(Value::I32(s.into_date().get_nums_days_unix_epoch())))? } _ => return no_match_err(), // google.type.Date }, - DataType::Time => match (expect_list, proto_field.kind()) { - (false, Kind::String) => { - maybe.on_base(|s| Ok(Value::String(s.into_time().to_string())))? - } + DataType::Time => match proto_field.kind() { + Kind::String => maybe.on_base(|s| Ok(Value::String(s.into_time().to_string())))?, _ => return no_match_err(), // google.type.TimeOfDay }, - DataType::Timestamp => match (expect_list, proto_field.kind()) { - (false, Kind::String) => { - maybe.on_base(|s| Ok(Value::String(s.into_timestamp().to_string())))? - } + DataType::Timestamp => match proto_field.kind() { + Kind::String => maybe.on_base(|s| Ok(Value::String(s.into_timestamp().to_string())))?, _ => return no_match_err(), // google.type.DateTime }, - DataType::Decimal => match (expect_list, proto_field.kind()) { - (false, Kind::String) => { - maybe.on_base(|s| Ok(Value::String(s.into_decimal().to_string())))? - } + DataType::Decimal => match proto_field.kind() { + Kind::String => maybe.on_base(|s| Ok(Value::String(s.into_decimal().to_string())))?, _ => return no_match_err(), // google.type.Decimal }, - DataType::Interval => match (expect_list, proto_field.kind()) { - (false, Kind::String) => { + DataType::Interval => match proto_field.kind() { + Kind::String => { maybe.on_base(|s| Ok(Value::String(s.into_interval().as_iso_8601())))? } _ => return no_match_err(), // Group D: unsupported }, - DataType::Serial => match (expect_list, proto_field.kind()) { - (false, Kind::Int64) => { - maybe.on_base(|s| Ok(Value::I64(s.into_serial().as_row_id())))? - } + DataType::Serial => match proto_field.kind() { + Kind::Int64 => maybe.on_base(|s| Ok(Value::I64(s.into_serial().as_row_id())))?, _ => return no_match_err(), // Group D: unsupported }, DataType::Int256 => { return no_match_err(); } - DataType::Map(_) => { - // TODO(map): support map - return no_match_err(); + DataType::Map(map_type) => { + if proto_field.is_map() { + let msg = match proto_field.kind() { + Kind::Message(m) => m, + _ => return no_match_err(), // unreachable actually + }; + return maybe.on_map(map_type, &msg); + } else { + return no_match_err(); + } } }; @@ -431,9 +458,13 @@ fn encode_field( #[cfg(test)] mod tests { + use itertools::Itertools; + use risingwave_common::array::{ArrayBuilder, StructArrayBuilder}; use risingwave_common::catalog::Field; use risingwave_common::row::OwnedRow; - use risingwave_common::types::{ListValue, ScalarImpl, StructValue, Timestamptz}; + use risingwave_common::types::{ + ListValue, MapType, MapValue, Scalar, ScalarImpl, StructValue, Timestamptz, + }; use super::*; @@ -465,6 +496,20 @@ mod tests { ), Field::with_name(DataType::List(DataType::Int32.into()), "repeated_int_field"), Field::with_name(DataType::Timestamptz, "timestamp_field"), + Field::with_name( + DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)), + "map_field", + ), + Field::with_name( + DataType::Map(MapType::from_kv( + DataType::Varchar, + DataType::Struct(StructType::new(vec![ + ("id", DataType::Int32), + ("name", DataType::Varchar), + ])), + )), + "map_struct_field", + ), ]); let row = OwnedRow::new(vec![ Some(ScalarImpl::Bool(true)), @@ -484,26 +529,360 @@ mod tests { ]))), Some(ScalarImpl::List(ListValue::from_iter([4, 0, 4]))), Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))), + Some(ScalarImpl::Map( + MapValue::try_from_kv( + ListValue::from_iter(["a", "b"]), + ListValue::from_iter([1, 2]), + ) + .unwrap(), + )), + { + let mut struct_array_builder = StructArrayBuilder::with_type( + 2, + DataType::Struct(StructType::new(vec![ + ("id", DataType::Int32), + ("name", DataType::Varchar), + ])), + ); + struct_array_builder.append(Some( + StructValue::new(vec![ + Some(ScalarImpl::Int32(1)), + Some(ScalarImpl::Utf8("x".into())), + ]) + .as_scalar_ref(), + )); + struct_array_builder.append(Some( + StructValue::new(vec![ + Some(ScalarImpl::Int32(2)), + Some(ScalarImpl::Utf8("y".into())), + ]) + .as_scalar_ref(), + )); + Some(ScalarImpl::Map( + MapValue::try_from_kv( + ListValue::from_iter(["a", "b"]), + ListValue::new(struct_array_builder.finish().into()), + ) + .unwrap(), + )) + }, ]); let encoder = ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap(); let m = encoder.encode(row).unwrap(); - let encoded: Vec = m.ser_to().unwrap(); - assert_eq!( - encoded, - // Hint: write the binary output to a file `test.binpb`, and view it with `protoc`: - // ``` - // protoc --decode_raw < test.binpb - // protoc --decode=all_types.AllTypes all-types.proto < test.binpb - // ``` - [ - 9, 0, 0, 0, 0, 0, 0, 17, 64, 21, 0, 0, 96, 64, 24, 22, 32, 23, 56, 48, 93, 26, 0, - 0, 0, 97, 27, 0, 0, 0, 0, 0, 0, 0, 104, 1, 114, 10, 82, 105, 115, 105, 110, 103, - 87, 97, 118, 101, 122, 2, 190, 239, 138, 1, 2, 8, 1, 146, 1, 3, 4, 0, 4, 186, 1, 3, - 16, 184, 23 - ] - ); + expect_test::expect![[r#" + field: FieldDescriptor { + name: "double_field", + full_name: "all_types.AllTypes.double_field", + json_name: "doubleField", + number: 1, + kind: double, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: F64(4.25) + + ============================== + field: FieldDescriptor { + name: "float_field", + full_name: "all_types.AllTypes.float_field", + json_name: "floatField", + number: 2, + kind: float, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: F32(3.5) + + ============================== + field: FieldDescriptor { + name: "int32_field", + full_name: "all_types.AllTypes.int32_field", + json_name: "int32Field", + number: 3, + kind: int32, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: I32(22) + + ============================== + field: FieldDescriptor { + name: "int64_field", + full_name: "all_types.AllTypes.int64_field", + json_name: "int64Field", + number: 4, + kind: int64, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: I64(23) + + ============================== + field: FieldDescriptor { + name: "sint32_field", + full_name: "all_types.AllTypes.sint32_field", + json_name: "sint32Field", + number: 7, + kind: sint32, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: I32(24) + + ============================== + field: FieldDescriptor { + name: "sfixed32_field", + full_name: "all_types.AllTypes.sfixed32_field", + json_name: "sfixed32Field", + number: 11, + kind: sfixed32, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: I32(26) + + ============================== + field: FieldDescriptor { + name: "sfixed64_field", + full_name: "all_types.AllTypes.sfixed64_field", + json_name: "sfixed64Field", + number: 12, + kind: sfixed64, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: I64(27) + + ============================== + field: FieldDescriptor { + name: "bool_field", + full_name: "all_types.AllTypes.bool_field", + json_name: "boolField", + number: 13, + kind: bool, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: Bool(true) + + ============================== + field: FieldDescriptor { + name: "string_field", + full_name: "all_types.AllTypes.string_field", + json_name: "stringField", + number: 14, + kind: string, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: String("RisingWave") + + ============================== + field: FieldDescriptor { + name: "bytes_field", + full_name: "all_types.AllTypes.bytes_field", + json_name: "bytesField", + number: 15, + kind: bytes, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: false, + } + + value: Bytes(b"\xbe\xef") + + ============================== + field: FieldDescriptor { + name: "nested_message_field", + full_name: "all_types.AllTypes.nested_message_field", + json_name: "nestedMessageField", + number: 17, + kind: all_types.AllTypes.NestedMessage, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: true, + } + + value: Message(DynamicMessage { desc: MessageDescriptor { name: "NestedMessage", full_name: "all_types.AllTypes.NestedMessage", is_map_entry: false, fields: [FieldDescriptor { name: "id", full_name: "all_types.AllTypes.NestedMessage.id", json_name: "id", number: 1, kind: int32, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }, FieldDescriptor { name: "name", full_name: "all_types.AllTypes.NestedMessage.name", json_name: "name", number: 2, kind: string, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }], oneofs: [] }, fields: DynamicMessageFieldSet { fields: {1: Value(I32(1)), 2: Value(String(""))} } }) + + ============================== + field: FieldDescriptor { + name: "repeated_int_field", + full_name: "all_types.AllTypes.repeated_int_field", + json_name: "repeatedIntField", + number: 18, + kind: int32, + cardinality: Repeated, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: true, + is_map: false, + is_packed: true, + supports_presence: false, + } + + value: List([I32(4), I32(0), I32(4)]) + + ============================== + field: FieldDescriptor { + name: "map_field", + full_name: "all_types.AllTypes.map_field", + json_name: "mapField", + number: 22, + kind: all_types.AllTypes.MapFieldEntry, + cardinality: Repeated, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: true, + is_packed: false, + supports_presence: false, + } + + value: Map({ + String("a"): I32(1), + String("b"): I32(2), + }) + + ============================== + field: FieldDescriptor { + name: "timestamp_field", + full_name: "all_types.AllTypes.timestamp_field", + json_name: "timestampField", + number: 23, + kind: google.protobuf.Timestamp, + cardinality: Optional, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: false, + is_packed: false, + supports_presence: true, + } + + value: Message(DynamicMessage { desc: MessageDescriptor { name: "Timestamp", full_name: "google.protobuf.Timestamp", is_map_entry: false, fields: [FieldDescriptor { name: "seconds", full_name: "google.protobuf.Timestamp.seconds", json_name: "seconds", number: 1, kind: int64, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }, FieldDescriptor { name: "nanos", full_name: "google.protobuf.Timestamp.nanos", json_name: "nanos", number: 2, kind: int32, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }], oneofs: [] }, fields: DynamicMessageFieldSet { fields: {2: Value(I32(3000))} } }) + + ============================== + field: FieldDescriptor { + name: "map_struct_field", + full_name: "all_types.AllTypes.map_struct_field", + json_name: "mapStructField", + number: 29, + kind: all_types.AllTypes.MapStructFieldEntry, + cardinality: Repeated, + containing_oneof: None, + default_value: None, + is_group: false, + is_list: false, + is_map: true, + is_packed: false, + supports_presence: false, + } + + value: Map({ + String("a"): Message(DynamicMessage { desc: MessageDescriptor { name: "NestedMessage", full_name: "all_types.AllTypes.NestedMessage", is_map_entry: false, fields: [FieldDescriptor { name: "id", full_name: "all_types.AllTypes.NestedMessage.id", json_name: "id", number: 1, kind: int32, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }, FieldDescriptor { name: "name", full_name: "all_types.AllTypes.NestedMessage.name", json_name: "name", number: 2, kind: string, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }], oneofs: [] }, fields: DynamicMessageFieldSet { fields: {1: Value(I32(1)), 2: Value(String("x"))} } }), + String("b"): Message(DynamicMessage { desc: MessageDescriptor { name: "NestedMessage", full_name: "all_types.AllTypes.NestedMessage", is_map_entry: false, fields: [FieldDescriptor { name: "id", full_name: "all_types.AllTypes.NestedMessage.id", json_name: "id", number: 1, kind: int32, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }, FieldDescriptor { name: "name", full_name: "all_types.AllTypes.NestedMessage.name", json_name: "name", number: 2, kind: string, cardinality: Optional, containing_oneof: None, default_value: None, is_group: false, is_list: false, is_map: false, is_packed: false, supports_presence: false }], oneofs: [] }, fields: DynamicMessageFieldSet { fields: {1: Value(I32(2)), 2: Value(String("y"))} } }), + })"#]].assert_eq(&format!("{}", + m.message.fields().format_with("\n\n==============================\n", |(field,value),f| { + f(&format!("field: {:#?}\n\nvalue: {}", field, print_proto(value))) + }))); + } + + fn print_proto(value: &Value) -> String { + match value { + Value::Map(m) => { + let mut res = String::new(); + res.push_str("Map({\n"); + for (k, v) in m.iter().sorted_by_key(|(k, _v)| *k) { + res.push_str(&format!( + " {}: {}, \n", + print_proto(&k.clone().into()), + print_proto(v) + )); + } + res.push_str("})"); + res + } + _ => format!("{:?}", value), + } } #[test] @@ -583,7 +962,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode map_field error: field not in proto" + "encode map_field error: cannot encode jsonb column as all_types.AllTypes.MapFieldEntry field" ); } }