diff --git a/e2e_test/source_inline/kafka/protobuf/recover.slt b/e2e_test/source_inline/kafka/protobuf/recover.slt new file mode 100644 index 0000000000000..3babf26793f2a --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/recover.slt @@ -0,0 +1,97 @@ +control substitution on + +system ok +rpk topic create 'test-pb-struct' + + +system ok +jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions" +syntax = "proto3"; +package test; +message User { + int32 id = 1; + Name name = 2; +} +message Name { + string first_name = 1; + string last_name = 2; +} +EOF + + +# create a source with v1 schema +statement ok +create source s with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test-pb-struct') +format plain encode protobuf ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User'); + + +# register a v2 schema +system ok +jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions" +syntax = "proto3"; +package test; +message User { + int32 id = 1; + Name name = 2; +} +message Name { + string first_name = 1; + string last_name = 2; + string middle_name = 3; +} +EOF + + +# trigger recovery +statement ok +recover; + + +sleep 2s + + +# produce a v2 message +statement ok +create sink sk as select + 1 as id, + row('Alan', 'Turing', 'Mathison')::struct as name +with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test-pb-struct') +format plain encode protobuf ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User'); + + +sleep 1s + + +# reading as v1 shall not panic +query IT +select * from s; +---- +1 (Alan,Turing) + + +statement ok +drop sink sk; + + +statement ok +drop source s; + + +system ok +curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value" + + +system ok +curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value?permanent=true" + + +system ok +rpk topic delete 'test-pb-struct' diff --git a/integration_tests/twitter-pulsar/pb/create_source.sql b/integration_tests/twitter-pulsar/pb/create_source.sql index bf41939b40d91..22c4927ab3bb9 100644 --- a/integration_tests/twitter-pulsar/pb/create_source.sql +++ b/integration_tests/twitter-pulsar/pb/create_source.sql @@ -1,5 +1,6 @@ CREATE SOURCE twitter WITH ( connector = 'pulsar', pulsar.topic = 'twitter', - pulsar.service.url = 'pulsar://message_queue:6650' + pulsar.service.url = 'pulsar://message_queue:6650', + subscription.name.prefix = 'custom_prefix' ) ROW FORMAT PROTOBUF MESSAGE 'twitter.schema.Event' ROW SCHEMA LOCATION 'http://file_server:8080/schema'; \ No newline at end of file diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index ec8c747cafd5a..bbd1d3f0da1e3 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -25,7 +25,7 @@ use risingwave_common::types::{ use risingwave_common::{bail, try_match_expand}; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; use thiserror::Error; -use thiserror_ext::{AsReport, Macro}; +use thiserror_ext::Macro; use crate::error::ConnectorResult; use crate::parser::unified::protobuf::ProtobufAccess; @@ -205,6 +205,7 @@ fn detect_loop_and_push( pub fn from_protobuf_value<'a>( field_desc: &FieldDescriptor, value: &'a Value, + type_expected: &DataType, ) -> AccessResult> { let kind = field_desc.kind(); @@ -240,32 +241,42 @@ pub fn from_protobuf_value<'a>( serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?, )) } else { - let mut rw_values = Vec::with_capacity(dyn_msg.descriptor().fields().len()); - // fields is a btree map in descriptor - // so it's order is the same as datatype - for field_desc in dyn_msg.descriptor().fields() { - // missing field - if !dyn_msg.has_field(&field_desc) - && field_desc.cardinality() == Cardinality::Required - { - return Err(AccessError::Undefined { - name: field_desc.name().to_owned(), - path: dyn_msg.descriptor().full_name().to_owned(), - }); - } - // use default value if dyn_msg doesn't has this field + let desc = dyn_msg.descriptor(); + let DataType::Struct(st) = type_expected else { + return Err(AccessError::TypeError { + expected: type_expected.to_string(), + got: desc.full_name().to_string(), + value: value.to_string(), // Protobuf TEXT + }); + }; + + let mut rw_values = Vec::with_capacity(st.len()); + for (name, expected_field_type) in st.iter() { + let Some(field_desc) = desc.get_field_by_name(name) else { + // Field deleted in protobuf. Fallback to SQL NULL (of proper RW type). + rw_values.push(None); + continue; + }; let value = dyn_msg.get_field(&field_desc); - rw_values.push(from_protobuf_value(&field_desc, &value)?.to_owned_datum()); + rw_values.push( + from_protobuf_value(&field_desc, &value, expected_field_type)? + .to_owned_datum(), + ); } ScalarImpl::Struct(StructValue::new(rw_values)) } } Value::List(values) => { - let data_type = protobuf_type_mapping(field_desc, &mut vec![]) - .map_err(|e| uncategorized!("{}", e.to_report_string()))?; - let mut builder = data_type.as_list().create_array_builder(values.len()); + let DataType::List(element_type) = type_expected else { + return Err(AccessError::TypeError { + expected: type_expected.to_string(), + got: format!("repeated {:?}", kind), + value: value.to_string(), // Protobuf TEXT + }); + }; + let mut builder = element_type.create_array_builder(values.len()); for value in values { - builder.append(from_protobuf_value(field_desc, value)?); + builder.append(from_protobuf_value(field_desc, value, element_type)?); } ScalarImpl::List(ListValue::new(builder.finish())) } @@ -389,6 +400,7 @@ mod test { use risingwave_pb::data::data_type::PbTypeName; use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use serde_json::json; + use thiserror_ext::AsReport as _; use super::*; use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage}; @@ -696,7 +708,8 @@ mod test { } fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) { - let dummy_type = DataType::Varchar; + let field = a.descriptor().get_field_by_name(field_name).unwrap(); + let dummy_type = protobuf_type_mapping(&field, &mut vec![]).unwrap(); let d = a.access_owned(&[field_name], &dummy_type).unwrap().unwrap(); assert_eq!(d, value, "field: {} value: {:?}", field_name, d); } @@ -756,48 +769,35 @@ mod test { println!("Current conf: {:#?}", conf); println!("---------------------------"); - let value = + let message = DynamicMessage::decode(conf.message_descriptor.clone(), ANY_GEN_PROTO_DATA).unwrap(); - println!("Test ANY_GEN_PROTO_DATA, current value: {:#?}", value); + println!("Test ANY_GEN_PROTO_DATA, current value: {:#?}", message); println!("---------------------------"); - // This is of no use - let field = value.fields().next().unwrap().0; + let field = conf + .message_descriptor + .get_field_by_name("any_value") + .unwrap(); + let value = message.get_field(&field); - if let Some(ret) = from_protobuf_value(&field, &Value::Message(value)) + let ret = from_protobuf_value(&field, &value, &DataType::Jsonb) .unwrap() - .to_owned_datum() - { - println!("Decoded Value for ANY_GEN_PROTO_DATA: {:#?}", ret); - println!("---------------------------"); - - let ScalarImpl::Struct(struct_value) = ret else { - panic!("Expected ScalarImpl::Struct"); - }; - - let fields = struct_value.fields(); + .to_owned_datum(); + println!("Decoded Value for ANY_GEN_PROTO_DATA: {:#?}", ret); + println!("---------------------------"); - match fields[0].clone() { - Some(ScalarImpl::Int32(v)) => { - println!("Successfully decode field[0]"); - assert_eq!(v, 12345); - } - _ => panic!("Expected ScalarImpl::Int32"), - } - - match fields[1].clone() { - Some(ScalarImpl::Jsonb(jv)) => { - assert_eq!( - jv, - JsonbVal::from(json!({ - "@type": "type.googleapis.com/test.StringValue", - "value": "John Doe" - })) - ); - } - _ => panic!("Expected ScalarImpl::Jsonb"), + match ret { + Some(ScalarImpl::Jsonb(jv)) => { + assert_eq!( + jv, + JsonbVal::from(json!({ + "@type": "type.googleapis.com/test.StringValue", + "value": "John Doe" + })) + ); } + _ => panic!("Expected ScalarImpl::Jsonb"), } Ok(()) @@ -818,48 +818,35 @@ mod test { println!("Current conf: {:#?}", conf); println!("---------------------------"); - let value = + let message = DynamicMessage::decode(conf.message_descriptor.clone(), ANY_GEN_PROTO_DATA_1).unwrap(); - println!("Current Value: {:#?}", value); + println!("Current Value: {:#?}", message); println!("---------------------------"); - // This is of no use - let field = value.fields().next().unwrap().0; + let field = conf + .message_descriptor + .get_field_by_name("any_value") + .unwrap(); + let value = message.get_field(&field); - if let Some(ret) = from_protobuf_value(&field, &Value::Message(value)) + let ret = from_protobuf_value(&field, &value, &DataType::Jsonb) .unwrap() - .to_owned_datum() - { - println!("Decoded Value for ANY_GEN_PROTO_DATA: {:#?}", ret); - println!("---------------------------"); - - let ScalarImpl::Struct(struct_value) = ret else { - panic!("Expected ScalarImpl::Struct"); - }; - - let fields = struct_value.fields(); - - match fields[0].clone() { - Some(ScalarImpl::Int32(v)) => { - println!("Successfully decode field[0]"); - assert_eq!(v, 12345); - } - _ => panic!("Expected ScalarImpl::Int32"), - } + .to_owned_datum(); + println!("Decoded Value for ANY_GEN_PROTO_DATA: {:#?}", ret); + println!("---------------------------"); - match fields[1].clone() { - Some(ScalarImpl::Jsonb(jv)) => { - assert_eq!( - jv, - JsonbVal::from(json!({ - "@type": "type.googleapis.com/test.Int32Value", - "value": 114514 - })) - ); - } - _ => panic!("Expected ScalarImpl::Jsonb"), + match ret { + Some(ScalarImpl::Jsonb(jv)) => { + assert_eq!( + jv, + JsonbVal::from(json!({ + "@type": "type.googleapis.com/test.Int32Value", + "value": 114514 + })) + ); } + _ => panic!("Expected ScalarImpl::Jsonb"), } Ok(()) @@ -888,58 +875,45 @@ mod test { println!("Current conf: {:#?}", conf); println!("---------------------------"); - let value = DynamicMessage::decode( + let message = DynamicMessage::decode( conf.message_descriptor.clone(), ANY_RECURSIVE_GEN_PROTO_DATA, ) .unwrap(); - println!("Current Value: {:#?}", value); + println!("Current Value: {:#?}", message); println!("---------------------------"); - // This is of no use - let field = value.fields().next().unwrap().0; + let field = conf + .message_descriptor + .get_field_by_name("any_value") + .unwrap(); + let value = message.get_field(&field); - if let Some(ret) = from_protobuf_value(&field, &Value::Message(value)) + let ret = from_protobuf_value(&field, &value, &DataType::Jsonb) .unwrap() - .to_owned_datum() - { - println!("Decoded Value for ANY_RECURSIVE_GEN_PROTO_DATA: {:#?}", ret); - println!("---------------------------"); - - let ScalarImpl::Struct(struct_value) = ret else { - panic!("Expected ScalarImpl::Struct"); - }; - - let fields = struct_value.fields(); - - match fields[0].clone() { - Some(ScalarImpl::Int32(v)) => { - println!("Successfully decode field[0]"); - assert_eq!(v, 12345); - } - _ => panic!("Expected ScalarImpl::Int32"), - } + .to_owned_datum(); + println!("Decoded Value for ANY_RECURSIVE_GEN_PROTO_DATA: {:#?}", ret); + println!("---------------------------"); - match fields[1].clone() { - Some(ScalarImpl::Jsonb(jv)) => { - assert_eq!( - jv, - JsonbVal::from(json!({ - "@type": "type.googleapis.com/test.AnyValue", - "anyValue1": { - "@type": "type.googleapis.com/test.StringValue", - "value": "114514", - }, - "anyValue2": { - "@type": "type.googleapis.com/test.Int32Value", - "value": 114514, - } - })) - ); - } - _ => panic!("Expected ScalarImpl::Jsonb"), + match ret { + Some(ScalarImpl::Jsonb(jv)) => { + assert_eq!( + jv, + JsonbVal::from(json!({ + "@type": "type.googleapis.com/test.AnyValue", + "anyValue1": { + "@type": "type.googleapis.com/test.StringValue", + "value": "114514", + }, + "anyValue2": { + "@type": "type.googleapis.com/test.Int32Value", + "value": 114514, + } + })) + ); } + _ => panic!("Expected ScalarImpl::Jsonb"), } Ok(()) @@ -956,14 +930,17 @@ mod test { async fn test_any_invalid() -> crate::error::ConnectorResult<()> { let conf = create_recursive_pb_parser_config("/any-schema.pb", "test.TestAny").await; - let value = + let message = DynamicMessage::decode(conf.message_descriptor.clone(), ANY_GEN_PROTO_DATA_INVALID) .unwrap(); - // The top-level `Value` is not a proto field, but we need a dummy one. - let field = value.fields().next().unwrap().0; + let field = conf + .message_descriptor + .get_field_by_name("any_value") + .unwrap(); + let value = message.get_field(&field); - let err = from_protobuf_value(&field, &Value::Message(value)).unwrap_err(); + let err = from_protobuf_value(&field, &value, &DataType::Jsonb).unwrap_err(); let expected = expect_test::expect![[r#" Fail to convert protobuf Any into jsonb diff --git a/src/connector/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs index b1d34746b5029..3ebeebca44373 100644 --- a/src/connector/src/parser/unified/protobuf.rs +++ b/src/connector/src/parser/unified/protobuf.rs @@ -32,14 +32,15 @@ impl ProtobufAccess { pub fn new(message: DynamicMessage) -> Self { Self { message } } + + #[cfg(test)] + pub fn descriptor(&self) -> prost_reflect::MessageDescriptor { + self.message.descriptor() + } } impl Access for ProtobufAccess { - fn access<'a>( - &'a self, - path: &[&str], - _type_expected: &DataType, - ) -> AccessResult> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { debug_assert_eq!(1, path.len()); let field_desc = self .message @@ -55,10 +56,10 @@ impl Access for ProtobufAccess { })?; match self.message.get_field(&field_desc) { - Cow::Borrowed(value) => from_protobuf_value(&field_desc, value), + Cow::Borrowed(value) => from_protobuf_value(&field_desc, value, type_expected), // `Owned` variant occurs only if there's no such field and the default value is returned. - Cow::Owned(value) => from_protobuf_value(&field_desc, &value) + Cow::Owned(value) => from_protobuf_value(&field_desc, &value, type_expected) // enforce `Owned` variant to avoid returning a reference to a temporary value .map(|d| d.to_owned_datum().into()), } diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 5d6d111b13bff..ffbc3be495bf9 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -74,6 +74,16 @@ pub struct PulsarProperties { #[serde(rename = "iceberg.bucket", default)] pub iceberg_bucket: Option, + /// Specify a custom consumer group id prefix for the source. + /// Defaults to `rw-consumer`. + /// + /// Notes: + /// - Each job (materialized view) will have multiple subscriptions and + /// contains a generated suffix in the subscription name. + /// The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`. + #[serde(rename = "subscription.name.prefix")] + pub subscription_name_prefix: Option, + #[serde(flatten)] pub unknown_fields: HashMap, } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 212c459388b25..20f6872474e88 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -42,6 +42,8 @@ use crate::source::{ SplitMetaData, SplitReader, }; +const PULSAR_DEFAULT_SUBSCRIPTION_PREFIX: &str = "rw-consumer"; + pub enum PulsarSplitReader { Broker(PulsarBrokerReader), Iceberg(PulsarIcebergReader), @@ -174,8 +176,12 @@ impl SplitReader for PulsarBrokerReader { .with_topic(&topic) .with_subscription_type(SubType::Exclusive) .with_subscription(format!( - "rw-consumer-{}-{}", - source_ctx.fragment_id, source_ctx.actor_id + "{}-{}-{}", + props + .subscription_name_prefix + .unwrap_or(PULSAR_DEFAULT_SUBSCRIPTION_PREFIX.to_string()), + source_ctx.fragment_id, + source_ctx.actor_id )); let builder = match split.start_offset.clone() { diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 695a2aeaa1c14..c54dce97ad1cd 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -1016,6 +1016,17 @@ PulsarProperties: field_type: String required: false default: Default::default + - name: subscription.name.prefix + field_type: String + comments: |- + Specify a custom consumer group id prefix for the source. + Defaults to `rw-consumer`. + + Notes: + - Each job (materialized view) will have multiple subscriptions and + contains a generated suffix in the subscription name. + The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`. + required: false S3Properties: fields: - name: s3.region_name