From 99fe0ebb668374167b0ac6cadead65c6d7175c1b Mon Sep 17 00:00:00 2001 From: Zihao Xu Date: Wed, 1 Nov 2023 04:19:18 +0800 Subject: [PATCH] feat(protobuf): support any for protobuf message source (#12291) Co-authored-by: Rossil <40714231+Rossil2012@users.noreply.github.com> Co-authored-by: xzhseh --- src/connector/src/parser/protobuf/parser.rs | 508 +++++++++++++++++-- src/connector/src/parser/unified/protobuf.rs | 15 +- src/connector/src/test_data/any-schema.pb | 30 ++ src/connector/src/test_data/any-schema.proto | 38 ++ 4 files changed, 543 insertions(+), 48 deletions(-) create mode 100644 src/connector/src/test_data/any-schema.pb create mode 100644 src/connector/src/test_data/any-schema.proto diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 0ca0235254000..732b7235a5146 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::path::Path; +use std::sync::Arc; use itertools::Itertools; use prost_reflect::{ @@ -23,7 +24,7 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; -use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl, F32, F64}; +use risingwave_common::types::{DataType, Datum, Decimal, JsonbVal, ScalarImpl, F32, F64}; use risingwave_pb::plan_common::ColumnDesc; use super::schema_resolver::*; @@ -39,6 +40,7 @@ use crate::schema::schema_registry::{ pub struct ProtobufAccessBuilder { confluent_wire_type: bool, message_descriptor: MessageDescriptor, + descriptor_pool: Arc, } impl AccessBuilder for ProtobufAccessBuilder { @@ -53,7 +55,10 @@ impl AccessBuilder for ProtobufAccessBuilder { let message = DynamicMessage::decode(self.message_descriptor.clone(), payload) .map_err(|e| ProtocolError(format!("parse message failed: {}", e)))?; - Ok(AccessImpl::Protobuf(ProtobufAccess::new(message))) + Ok(AccessImpl::Protobuf(ProtobufAccess::new( + message, + Arc::clone(&self.descriptor_pool), + ))) } } @@ -62,10 +67,13 @@ impl ProtobufAccessBuilder { let ProtobufParserConfig { confluent_wire_type, message_descriptor, + descriptor_pool, } = config; + Ok(Self { confluent_wire_type, message_descriptor, + descriptor_pool, }) } } @@ -74,6 +82,8 @@ impl ProtobufAccessBuilder { pub struct ProtobufParserConfig { confluent_wire_type: bool, pub(crate) message_descriptor: MessageDescriptor, + /// Note that the pub(crate) here is merely for testing + pub(crate) descriptor_pool: Arc, } impl ProtobufParserConfig { @@ -137,15 +147,18 @@ impl ProtobufParserConfig { location, e )) })?; + let message_descriptor = pool.get_message_by_name(message_name).ok_or_else(|| { ProtocolError(format!( - "cannot find message {} in schema: {}.\n poll is {:?}", + "Cannot find message {} in schema: {}.\nDescriptor pool is {:?}", message_name, location, pool )) })?; + Ok(Self { message_descriptor, confluent_wire_type: protobuf_config.use_schema_registry, + descriptor_pool: Arc::new(pool), }) } @@ -227,7 +240,142 @@ fn detect_loop_and_push(trace: &mut Vec, fd: &FieldDescriptor) -> Result Ok(()) } -pub fn from_protobuf_value(field_desc: &FieldDescriptor, value: &Value) -> Result { +fn extract_any_info(dyn_msg: &DynamicMessage) -> (String, Value) { + debug_assert!( + dyn_msg.fields().count() == 2, + "Expected only two fields for Any Type MessageDescriptor" + ); + + let type_url = dyn_msg + .get_field_by_name("type_url") + .expect("Expect type_url in dyn_msg") + .to_string() + .split('/') + .nth(1) + .map(|part| part[..part.len() - 1].to_string()) + .unwrap_or_default(); + + let payload = dyn_msg + .get_field_by_name("value") + .expect("Expect value (payload) in dyn_msg") + .as_ref() + .clone(); + + (type_url, payload) +} + +/// TODO: Resolve the potential naming conflict in the map +/// i.e., If the two anonymous type shares the same key (e.g., "Int32"), +/// the latter will overwrite the former one in `serde_json::Map`. +/// Possible solution, maintaining a global id map, for the same types +/// In the same level of fields, add the unique id at the tail of the name. +/// e.g., "Int32.1" & "Int32.2" in the above example +fn recursive_parse_json( + fields: &[Datum], + full_name_vec: Option>, + full_name: Option, +) -> serde_json::Value { + // Note that the key is of no order + let mut ret: serde_json::Map = serde_json::Map::new(); + + // The hidden type hint for user's convenience + // i.e., `"_type": message.full_name()` + if let Some(full_name) = full_name { + ret.insert("_type".to_string(), serde_json::Value::String(full_name)); + } + + for (idx, field) in fields.iter().enumerate() { + let mut key; + if let Some(k) = full_name_vec.as_ref() { + key = k[idx].to_string(); + } else { + key = "".to_string(); + } + + match field.clone() { + Some(ScalarImpl::Int16(v)) => { + if key.is_empty() { + key = "Int16".to_string(); + } + ret.insert(key, serde_json::Value::Number(serde_json::Number::from(v))); + } + Some(ScalarImpl::Int32(v)) => { + if key.is_empty() { + key = "Int32".to_string(); + } + ret.insert(key, serde_json::Value::Number(serde_json::Number::from(v))); + } + Some(ScalarImpl::Int64(v)) => { + if key.is_empty() { + key = "Int64".to_string(); + } + ret.insert(key, serde_json::Value::Number(serde_json::Number::from(v))); + } + Some(ScalarImpl::Bool(v)) => { + if key.is_empty() { + key = "Bool".to_string(); + } + ret.insert(key, serde_json::Value::Bool(v)); + } + Some(ScalarImpl::Bytea(v)) => { + if key.is_empty() { + key = "Bytea".to_string(); + } + let s = String::from_utf8(v.to_vec()).unwrap(); + ret.insert(key, serde_json::Value::String(s)); + } + Some(ScalarImpl::Float32(v)) => { + if key.is_empty() { + key = "Int16".to_string(); + } + ret.insert( + key, + serde_json::Value::Number( + serde_json::Number::from_f64(v.into_inner() as f64).unwrap(), + ), + ); + } + Some(ScalarImpl::Float64(v)) => { + if key.is_empty() { + key = "Float64".to_string(); + } + ret.insert( + key, + serde_json::Value::Number( + serde_json::Number::from_f64(v.into_inner()).unwrap(), + ), + ); + } + Some(ScalarImpl::Utf8(v)) => { + if key.is_empty() { + key = "Utf8".to_string(); + } + ret.insert(key, serde_json::Value::String(v.to_string())); + } + Some(ScalarImpl::Struct(v)) => { + if key.is_empty() { + key = "Struct".to_string(); + } + ret.insert(key, recursive_parse_json(v.fields(), None, None)); + } + Some(ScalarImpl::Jsonb(v)) => { + if key.is_empty() { + key = "Jsonb".to_string(); + } + ret.insert(key, v.take()); + } + r#type => panic!("Not yet support ScalarImpl type: {:?}", r#type), + } + } + + serde_json::Value::Object(ret) +} + +pub fn from_protobuf_value( + field_desc: &FieldDescriptor, + value: &Value, + descriptor_pool: &Arc, +) -> Result { let v = match value { Value::Bool(v) => ScalarImpl::Bool(*v), Value::I32(i) => ScalarImpl::Int32(*i), @@ -253,30 +401,95 @@ pub fn from_protobuf_value(field_desc: &FieldDescriptor, value: &Value) -> Resul ScalarImpl::Utf8(enum_symbol.name().into()) } Value::Message(dyn_msg) => { - let mut rw_values = Vec::with_capacity(dyn_msg.descriptor().fields().len()); - // fields is a btree map in descriptor - // so it's order is the same as datatype - for field_desc in dyn_msg.descriptor().fields() { - // missing field - if !dyn_msg.has_field(&field_desc) - && field_desc.cardinality() == Cardinality::Required - { - let err_msg = format!( - "protobuf parse error.missing required field {:?}", - field_desc - ); + if dyn_msg.descriptor().full_name() == "google.protobuf.Any" { + // If the fields are not presented, default value is an empty string + if !dyn_msg.has_field_by_name("type_url") || !dyn_msg.has_field_by_name("value") { + return Ok(Some(ScalarImpl::Jsonb(JsonbVal::from( + serde_json::json! {""}, + )))); + } + + // Sanity check + debug_assert!( + dyn_msg.has_field_by_name("type_url") && dyn_msg.has_field_by_name("value"), + "`type_url` & `value` must exist in fields of `dyn_msg`" + ); + + // The message is of type `Any` + let (type_url, payload) = extract_any_info(dyn_msg); + + let payload_field_desc = dyn_msg.descriptor().get_field_by_name("value").unwrap(); + + let Some(ScalarImpl::Bytea(payload)) = + from_protobuf_value(&payload_field_desc, &payload, descriptor_pool)? + else { + let err_msg = "Expected ScalarImpl::Bytea for payload".to_string(); return Err(RwError::from(ProtocolError(err_msg))); + }; + + // Get the corresponding schema from the descriptor pool + let msg_desc = descriptor_pool + .get_message_by_name(&type_url) + .ok_or_else(|| { + ProtocolError(format!( + "Cannot find message {} in from_protobuf_value.\nDescriptor pool is {:#?}", + type_url, descriptor_pool + )) + })?; + + let f = msg_desc + .clone() + .fields() + .map(|f| f.name().to_string()) + .collect::>(); + + let full_name = msg_desc.clone().full_name().to_string(); + + // Decode the payload based on the `msg_desc` + let decoded_value = DynamicMessage::decode(msg_desc, payload.as_ref()).unwrap(); + let decoded_value = from_protobuf_value( + field_desc, + &Value::Message(decoded_value), + descriptor_pool, + )? + .unwrap(); + + // Extract the struct value + let ScalarImpl::Struct(v) = decoded_value else { + panic!("Expect ScalarImpl::Struct"); + }; + + ScalarImpl::Jsonb(JsonbVal::from(serde_json::json!(recursive_parse_json( + v.fields(), + Some(f), + Some(full_name), + )))) + } else { + let mut rw_values = Vec::with_capacity(dyn_msg.descriptor().fields().len()); + // fields is a btree map in descriptor + // so it's order is the same as datatype + for field_desc in dyn_msg.descriptor().fields() { + // missing field + if !dyn_msg.has_field(&field_desc) + && field_desc.cardinality() == Cardinality::Required + { + let err_msg = format!( + "protobuf parse error.missing required field {:?}", + field_desc + ); + return Err(RwError::from(ProtocolError(err_msg))); + } + // use default value if dyn_msg doesn't has this field + let value = dyn_msg.get_field(&field_desc); + rw_values.push(from_protobuf_value(&field_desc, &value, descriptor_pool)?); } - // use default value if dyn_msg doesn't has this field - let value = dyn_msg.get_field(&field_desc); - rw_values.push(from_protobuf_value(&field_desc, &value)?); + ScalarImpl::Struct(StructValue::new(rw_values)) } - ScalarImpl::Struct(StructValue::new(rw_values)) } Value::List(values) => { let rw_values = values .iter() - .map(|value| from_protobuf_value(field_desc, value)) + .map(|value| from_protobuf_value(field_desc, value, descriptor_pool)) .collect::>>()?; ScalarImpl::List(ListValue::new(rw_values)) } @@ -316,7 +529,18 @@ fn protobuf_type_mapping( .map(|f| protobuf_type_mapping(&f, parse_trace)) .collect::>>()?; let field_names = m.fields().map(|f| f.name().to_string()).collect_vec(); - DataType::new_struct(fields, field_names) + + // Note that this part is useful for actual parsing + // Since RisingWave will parse message to `ScalarImpl::Jsonb` + // Please do NOT modify it + if field_names.len() == 2 + && field_names.contains(&"value".to_string()) + && field_names.contains(&"type_url".to_string()) + { + DataType::Jsonb + } else { + DataType::new_struct(fields, field_names) + } } Kind::Enum(_) => DataType::Varchar, Kind::Bytes => DataType::Bytea, @@ -337,7 +561,7 @@ fn protobuf_type_mapping( pub(crate) fn resolve_pb_header(payload: &[u8]) -> Result<&[u8]> { // there's a message index array at the front of payload // if it is the first message in proto def, the array is just and `0` - // TODO: support parsing more complex indec array + // TODO: support parsing more complex index array let (_, remained) = extract_schema_id(payload)?; match remained.first() { Some(0) => Ok(&remained[1..]), @@ -360,6 +584,7 @@ mod test { use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::data::data_type::PbTypeName; use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; + use serde_json::json; use super::*; use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage}; @@ -504,9 +729,11 @@ mod test { assert!(columns.is_err()); } - async fn create_recursive_pb_parser_config() -> ProtobufParserConfig { - let location = schema_dir() + "/proto_recursive/recursive.pb"; - let message_name = "recursive.AllTypes"; + async fn create_recursive_pb_parser_config( + location: &str, + message_name: &str, + ) -> ProtobufParserConfig { + let location = schema_dir() + location; let info = StreamSourceInfo { proto_message_name: message_name.to_string(), @@ -525,7 +752,11 @@ mod test { #[tokio::test] async fn test_all_types_create_source() { - let conf = create_recursive_pb_parser_config().await; + let conf = create_recursive_pb_parser_config( + "/proto_recursive/recursive.pb", + "recursive.AllTypes", + ) + .await; // Ensure that the parser can recognize the schema. let columns = conf @@ -569,10 +800,7 @@ mod test { ("seconds", DataType::Int64), ("nanos", DataType::Int32) ])), // duration_field - DataType::Struct(StructType::new(vec![ - ("type_url", DataType::Varchar), - ("value", DataType::Bytea), - ])), // any_field + DataType::Jsonb, // any_field DataType::Struct(StructType::new(vec![("value", DataType::Int32)])), /* int32_value_field */ DataType::Struct(StructType::new(vec![("value", DataType::Varchar)])), /* string_value_field */ ] @@ -585,7 +813,11 @@ mod test { let mut payload = Vec::new(); m.encode(&mut payload).unwrap(); - let conf = create_recursive_pb_parser_config().await; + let conf = create_recursive_pb_parser_config( + "/proto_recursive/recursive.pb", + "recursive.AllTypes", + ) + .await; let mut access_builder = ProtobufAccessBuilder::new(conf).unwrap(); let access = access_builder.generate_accessor(payload).await.unwrap(); if let AccessImpl::Protobuf(a) = access { @@ -648,18 +880,6 @@ mod test { Some(ScalarImpl::Int32(500000000)), ])), ); - pb_eq( - a, - "any_field", - S::Struct(StructValue::new(vec![ - Some(ScalarImpl::Utf8( - m.any_field.as_ref().unwrap().type_url.as_str().into(), - )), - Some(ScalarImpl::Bytea( - m.any_field.as_ref().unwrap().value.clone().into(), - )), - ])), - ); pb_eq( a, "int32_value_field", @@ -722,4 +942,204 @@ mod test { example_oneof: Some(ExampleOneof::OneofInt32(123)), } } + + // id: 12345 + // name { + // type_url: "type.googleapis.com/test.StringValue" + // value: "\n\010John Doe" + // } + static ANY_GEN_PROTO_DATA: &[u8] = b"\x08\xb9\x60\x12\x32\x0a\x24\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x53\x74\x72\x69\x6e\x67\x56\x61\x6c\x75\x65\x12\x0a\x0a\x08\x4a\x6f\x68\x6e\x20\x44\x6f\x65"; + + #[tokio::test] + async fn test_any_schema() -> Result<()> { + let conf = create_recursive_pb_parser_config("/any-schema.pb", "test.TestAny").await; + + println!("Current conf: {:#?}", conf); + println!("---------------------------"); + + let value = + DynamicMessage::decode(conf.message_descriptor.clone(), ANY_GEN_PROTO_DATA).unwrap(); + + println!("Test ANY_GEN_PROTO_DATA, current value: {:#?}", value); + println!("---------------------------"); + + // This is of no use + let field = value.fields().next().unwrap().0; + + if let Some(ret) = + from_protobuf_value(&field, &Value::Message(value), &conf.descriptor_pool).unwrap() + { + println!("Decoded Value for ANY_GEN_PROTO_DATA: {:#?}", ret); + println!("---------------------------"); + + let ScalarImpl::Struct(struct_value) = ret else { + panic!("Expected ScalarImpl::Struct"); + }; + + let fields = struct_value.fields(); + + match fields[0].clone() { + Some(ScalarImpl::Int32(v)) => { + println!("Successfully decode field[0]"); + assert_eq!(v, 12345); + } + _ => panic!("Expected ScalarImpl::Int32"), + } + + match fields[1].clone() { + Some(ScalarImpl::Jsonb(jv)) => { + assert_eq!( + jv, + JsonbVal::from(json!({ + "_type": "test.StringValue", + "value": "John Doe" + })) + ); + } + _ => panic!("Expected ScalarImpl::Jsonb"), + } + } + + Ok(()) + } + + // id: 12345 + // name { + // type_url: "type.googleapis.com/test.Int32Value" + // value: "\010\322\376\006" + // } + // Unpacked Int32Value from Any: value: 114514 + static ANY_GEN_PROTO_DATA_1: &[u8] = b"\x08\xb9\x60\x12\x2b\x0a\x23\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x49\x6e\x74\x33\x32\x56\x61\x6c\x75\x65\x12\x04\x08\xd2\xfe\x06"; + + #[tokio::test] + async fn test_any_schema_1() -> Result<()> { + let conf = create_recursive_pb_parser_config("/any-schema.pb", "test.TestAny").await; + + println!("Current conf: {:#?}", conf); + println!("---------------------------"); + + let value = + DynamicMessage::decode(conf.message_descriptor.clone(), ANY_GEN_PROTO_DATA_1).unwrap(); + + println!("Current Value: {:#?}", value); + println!("---------------------------"); + + // This is of no use + let field = value.fields().next().unwrap().0; + + if let Some(ret) = + from_protobuf_value(&field, &Value::Message(value), &conf.descriptor_pool).unwrap() + { + println!("Decoded Value for ANY_GEN_PROTO_DATA: {:#?}", ret); + println!("---------------------------"); + + let ScalarImpl::Struct(struct_value) = ret else { + panic!("Expected ScalarImpl::Struct"); + }; + + let fields = struct_value.fields(); + + match fields[0].clone() { + Some(ScalarImpl::Int32(v)) => { + println!("Successfully decode field[0]"); + assert_eq!(v, 12345); + } + _ => panic!("Expected ScalarImpl::Int32"), + } + + match fields[1].clone() { + Some(ScalarImpl::Jsonb(jv)) => { + assert_eq!( + jv, + JsonbVal::from(json!({ + "_type": "test.Int32Value", + "value": 114514 + })) + ); + } + _ => panic!("Expected ScalarImpl::Jsonb"), + } + } + + Ok(()) + } + + // "id": 12345, + // "any_value": { + // "type_url": "type.googleapis.com/test.AnyValue", + // "value": { + // "any_value_1": { + // "type_url": "type.googleapis.com/test.StringValue", + // "value": "114514" + // }, + // "any_value_2": { + // "type_url": "type.googleapis.com/test.Int32Value", + // "value": 114514 + // } + // } + // } + static ANY_RECURSIVE_GEN_PROTO_DATA: &[u8] = b"\x08\xb9\x60\x12\x84\x01\x0a\x21\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x41\x6e\x79\x56\x61\x6c\x75\x65\x12\x5f\x0a\x30\x0a\x24\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x53\x74\x72\x69\x6e\x67\x56\x61\x6c\x75\x65\x12\x08\x0a\x06\x31\x31\x34\x35\x31\x34\x12\x2b\x0a\x23\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x49\x6e\x74\x33\x32\x56\x61\x6c\x75\x65\x12\x04\x08\xd2\xfe\x06"; + + #[tokio::test] + async fn test_any_recursive() -> Result<()> { + let conf = create_recursive_pb_parser_config("/any-schema.pb", "test.TestAny").await; + + println!("Current conf: {:#?}", conf); + println!("---------------------------"); + + let value = DynamicMessage::decode( + conf.message_descriptor.clone(), + ANY_RECURSIVE_GEN_PROTO_DATA, + ) + .unwrap(); + + println!("Current Value: {:#?}", value); + println!("---------------------------"); + + // This is of no use + let field = value.fields().next().unwrap().0; + + if let Some(ret) = + from_protobuf_value(&field, &Value::Message(value), &conf.descriptor_pool).unwrap() + { + println!("Decoded Value for ANY_RECURSIVE_GEN_PROTO_DATA: {:#?}", ret); + println!("---------------------------"); + + let ScalarImpl::Struct(struct_value) = ret else { + panic!("Expected ScalarImpl::Struct"); + }; + + let fields = struct_value.fields(); + + match fields[0].clone() { + Some(ScalarImpl::Int32(v)) => { + println!("Successfully decode field[0]"); + assert_eq!(v, 12345); + } + _ => panic!("Expected ScalarImpl::Int32"), + } + + match fields[1].clone() { + Some(ScalarImpl::Jsonb(jv)) => { + assert_eq!( + jv, + JsonbVal::from(json!({ + "_type": "test.AnyValue", + "any_value_1": { + "_type": "test.StringValue", + "value": "114514", + }, + "any_value_2": { + "_type": "test.Int32Value", + "value": 114514, + } + })) + ); + } + _ => panic!("Expected ScalarImpl::Jsonb"), + } + } + + Ok(()) + } } diff --git a/src/connector/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs index 0afe0cde52d34..bd447b84cfb62 100644 --- a/src/connector/src/parser/unified/protobuf.rs +++ b/src/connector/src/parser/unified/protobuf.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use anyhow::anyhow; -use prost_reflect::{DynamicMessage, ReflectMessage}; +use prost_reflect::{DescriptorPool, DynamicMessage, ReflectMessage}; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::RwError; use risingwave_common::types::DataType; @@ -24,11 +26,15 @@ use crate::parser::unified::AccessError; pub struct ProtobufAccess { message: DynamicMessage, + descriptor_pool: Arc, } impl ProtobufAccess { - pub fn new(message: DynamicMessage) -> Self { - Self { message } + pub fn new(message: DynamicMessage, descriptor_pool: Arc) -> Self { + Self { + message, + descriptor_pool, + } } } @@ -46,6 +52,7 @@ impl Access for ProtobufAccess { }) .map_err(|e| AccessError::Other(anyhow!(e)))?; let value = self.message.get_field(&field_desc); - from_protobuf_value(&field_desc, &value).map_err(|e| AccessError::Other(anyhow!(e))) + from_protobuf_value(&field_desc, &value, &self.descriptor_pool) + .map_err(|e| AccessError::Other(anyhow!(e))) } } diff --git a/src/connector/src/test_data/any-schema.pb b/src/connector/src/test_data/any-schema.pb new file mode 100644 index 0000000000000..977f64cec3775 --- /dev/null +++ b/src/connector/src/test_data/any-schema.pb @@ -0,0 +1,30 @@ + +ä +google/protobuf/any.protogoogle.protobuf"6 +Any +type_url ( RtypeUrl +value ( RvalueBv +com.google.protobufBAnyProtoPZ,google.golang.org/protobuf/types/known/anypb¢GPBªGoogle.Protobuf.WellKnownTypesbproto3 +á +any-schema.prototestgoogle/protobuf/any.proto"L +TestAny +id (Rid1 + any_value ( 2.google.protobuf.AnyRanyValue"# + StringValue +value ( Rvalue"" + +Int32Value +value (Rvalue"v +AnyValue4 + any_value_1 ( 2.google.protobuf.AnyR anyValue14 + any_value_2 ( 2.google.protobuf.AnyR anyValue2"@ +StringInt32Value +first ( Rfirst +second (Rsecond"Ž +StringStringInt32Value +first ( Rfirst. +second ( 2.test.StringInt32ValueRsecond. +third ( 2.test.Float32StringValueRthird"B +Float32StringValue +first (Rfirst +second ( Rsecondbproto3 \ No newline at end of file diff --git a/src/connector/src/test_data/any-schema.proto b/src/connector/src/test_data/any-schema.proto new file mode 100644 index 0000000000000..12a367100ce7d --- /dev/null +++ b/src/connector/src/test_data/any-schema.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +import "google/protobuf/any.proto"; + +package test; +message TestAny { + int32 id = 1; + google.protobuf.Any any_value = 2; +} + +message StringValue { + string value = 1; +} + +message Int32Value { + int32 value = 1; +} + +message AnyValue { + google.protobuf.Any any_value_1 = 1; + google.protobuf.Any any_value_2 = 2; +} + +message StringInt32Value { + string first = 1; + int32 second = 2; +} + +message StringStringInt32Value { + string first = 1; + StringInt32Value second = 2; + Float32StringValue third = 3; +} + +message Float32StringValue { + float first = 1; + string second = 2; +} \ No newline at end of file