Skip to content

Commit

Permalink
feat: message_as_jsonb to handle circle dep in protobuf (#19935) (#…
Browse files Browse the repository at this point in the history
…19987)

Signed-off-by: tabversion <[email protected]>
Co-authored-by: Bohan Zhang <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
3 people authored Jan 2, 2025
1 parent 336bbea commit b364088
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 42 deletions.
52 changes: 52 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/opentelemetry.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
control substitution on

system ok
rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_common.proto"

system ok
rpk registry schema create "opentelemetry_test-value" --schema "/dev/stdin" --references opentelemetry_common.proto:opentelemetry_common.proto:1 --type protobuf << EOF
syntax = "proto3";
package opentelemetry_test;
import "opentelemetry_common.proto";
message OTLPTestMessage {
opentelemetry.proto.common.v1.AnyValue any_value = 1;
opentelemetry.proto.common.v1.KeyValueList key_value_list = 2;
opentelemetry.proto.common.v1.InstrumentationScope instrumentation_scope = 3;
}
EOF


system ok
echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation

statement ok
create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'opentelemetry_test.OTLPTestMessage', messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue');

statement ok
flush;

sleep 1s

query T
select count(*) from opentelemetry_test;
----
1

query TTT
select any_value, key_value_list, instrumentation_scope from opentelemetry_test;
----
{"stringValue": "example"} {"values": [{"key": "key1", "value": {"stringValue": "value1"}}, {"key": "key2", "value": {"intValue": "42"}}]} (test-scope,1.0,{},0)

# ==== clean up ====

statement ok
drop table opentelemetry_test;

system ok
rpk topic delete opentelemetry_test;

system ok
rpk registry subject delete "opentelemetry_test-value"

system ok
rpk registry subject delete "opentelemetry_common.proto"
41 changes: 41 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
control substitution on

system ok
rpk registry schema create "recursive_complex-value" --schema "/dev/stdin" --type protobuf << EOF
syntax = "proto3";
// a recursive complex type can cause stack overflow in the frontend when inferring the schema
package recursive_complex;
message AnyValue {
oneof value {
string string_value = 1;
int32 int_value = 2;
double double_value = 3;
bool bool_value = 4;
ArrayValue array_value = 5;
}
}
message ArrayValue {
AnyValue value1 = 1;
AnyValue value2 = 2;
ArrayValue array_value = 3;
}
EOF


system ok
echo '{"array_value":{"value1":{"string_value":"This is a string value"},"value2":{"int_value":42},"array_value":{"value1":{"double_value":3.14159},"value2":{"bool_value":true},"array_value":{"value1":{"string_value":"Deeply nested string"},"value2":{"int_value":100}}}}}' | rpk topic produce "recursive_complex" --schema-id=topic --schema-type="recursive_complex.AnyValue" --allow-auto-topic-creation

# the test just make sure the table can finish create process
statement ok
create table recursive_complex with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'recursive_complex' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'recursive_complex.AnyValue', messages_as_jsonb = 'recursive_complex.AnyValue,recursive_complex.ArrayValue');

# ==== clean up ====

statement ok
drop table recursive_complex;

system ok
rpk topic delete recursive_complex;

system ok
rpk registry subject delete "recursive_complex-value"
26 changes: 17 additions & 9 deletions src/connector/codec/src/decoder/protobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod parser;
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::LazyLock;

use parser::from_protobuf_value;
Expand All @@ -24,13 +25,17 @@ use thiserror_ext::AsReport;

use super::{uncategorized, Access, AccessResult};

pub struct ProtobufAccess {
pub struct ProtobufAccess<'a> {
message: DynamicMessage,
messages_as_jsonb: &'a HashSet<String>,
}

impl ProtobufAccess {
pub fn new(message: DynamicMessage) -> Self {
Self { message }
impl<'a> ProtobufAccess<'a> {
pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet<String>) -> Self {
Self {
message,
messages_as_jsonb,
}
}

#[cfg(test)]
Expand All @@ -39,7 +44,7 @@ impl ProtobufAccess {
}
}

impl Access for ProtobufAccess {
impl Access for ProtobufAccess<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
debug_assert_eq!(1, path.len());
let field_desc = self
Expand All @@ -56,12 +61,15 @@ impl Access for ProtobufAccess {
})?;

match self.message.get_field(&field_desc) {
Cow::Borrowed(value) => from_protobuf_value(&field_desc, value, type_expected),
Cow::Borrowed(value) => {
from_protobuf_value(&field_desc, value, type_expected, self.messages_as_jsonb)
}

// `Owned` variant occurs only if there's no such field and the default value is returned.
Cow::Owned(value) => from_protobuf_value(&field_desc, &value, type_expected)
// enforce `Owned` variant to avoid returning a reference to a temporary value
.map(|d| d.to_owned_datum().into()),
Cow::Owned(value) => {
from_protobuf_value(&field_desc, &value, type_expected, self.messages_as_jsonb)
.map(|d| d.to_owned_datum().into())
}
}
}
}
68 changes: 55 additions & 13 deletions src/connector/codec/src/decoder/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use anyhow::Context;
use itertools::Itertools;
use prost_reflect::{Cardinality, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value};
Expand All @@ -26,14 +28,22 @@ use thiserror_ext::Macro;

use crate::decoder::{uncategorized, AccessError, AccessResult};

pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb";

pub fn pb_schema_to_column_descs(
message_descriptor: &MessageDescriptor,
messages_as_jsonb: &HashSet<String>,
) -> anyhow::Result<Vec<ColumnDesc>> {
let mut columns = Vec::with_capacity(message_descriptor.fields().len());
let mut index = 0;
let mut parse_trace: Vec<String> = vec![];
for field in message_descriptor.fields() {
columns.push(pb_field_to_col_desc(&field, &mut index, &mut parse_trace)?);
columns.push(pb_field_to_col_desc(
&field,
&mut index,
&mut parse_trace,
messages_as_jsonb,
)?);
}

Ok(columns)
Expand All @@ -44,15 +54,18 @@ fn pb_field_to_col_desc(
field_descriptor: &FieldDescriptor,
index: &mut i32,
parse_trace: &mut Vec<String>,
messages_as_jsonb: &HashSet<String>,
) -> anyhow::Result<ColumnDesc> {
let field_type = protobuf_type_mapping(field_descriptor, parse_trace)
let field_type = protobuf_type_mapping(field_descriptor, parse_trace, messages_as_jsonb)
.context("failed to map protobuf type")?;
if let Kind::Message(m) = field_descriptor.kind() {
if let Kind::Message(m) = field_descriptor.kind()
&& !messages_as_jsonb.contains(m.full_name())
{
let field_descs = if let DataType::List { .. } = field_type {
vec![]
} else {
m.fields()
.map(|f| pb_field_to_col_desc(&f, index, parse_trace))
.map(|f| pb_field_to_col_desc(&f, index, parse_trace, messages_as_jsonb))
.try_collect()?
};
*index += 1;
Expand Down Expand Up @@ -92,10 +105,12 @@ fn detect_loop_and_push(
let identifier = format!("{}({})", fd.name(), fd.full_name());
if trace.iter().any(|s| s == identifier.as_str()) {
bail_protobuf_type_error!(
"circular reference detected: {}, conflict with {}, kind {:?}",
"circular reference detected: {}, conflict with {}, kind {:?}. Adding {:?} to {:?} may help.",
trace.iter().format("->"),
identifier,
fd.kind(),
fd.kind(),
PROTOBUF_MESSAGES_AS_JSONB,
);
}
trace.push(identifier);
Expand All @@ -106,6 +121,7 @@ pub fn from_protobuf_value<'a>(
field_desc: &FieldDescriptor,
value: &'a Value,
type_expected: &DataType,
messages_as_jsonb: &'a HashSet<String>,
) -> AccessResult<DatumCow<'a>> {
let kind = field_desc.kind();

Expand Down Expand Up @@ -136,7 +152,7 @@ pub fn from_protobuf_value<'a>(
ScalarImpl::Utf8(enum_symbol.name().into())
}
Value::Message(dyn_msg) => {
if dyn_msg.descriptor().full_name() == "google.protobuf.Any" {
if messages_as_jsonb.contains(dyn_msg.descriptor().full_name()) {
ScalarImpl::Jsonb(JsonbVal::from(
serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?,
))
Expand All @@ -159,8 +175,13 @@ pub fn from_protobuf_value<'a>(
};
let value = dyn_msg.get_field(&field_desc);
rw_values.push(
from_protobuf_value(&field_desc, &value, expected_field_type)?
.to_owned_datum(),
from_protobuf_value(
&field_desc,
&value,
expected_field_type,
messages_as_jsonb,
)?
.to_owned_datum(),
);
}
ScalarImpl::Struct(StructValue::new(rw_values))
Expand All @@ -176,7 +197,12 @@ pub fn from_protobuf_value<'a>(
};
let mut builder = element_type.create_array_builder(values.len());
for value in values {
builder.append(from_protobuf_value(field_desc, value, element_type)?);
builder.append(from_protobuf_value(
field_desc,
value,
element_type,
messages_as_jsonb,
)?);
}
ScalarImpl::List(ListValue::new(builder.finish()))
}
Expand Down Expand Up @@ -209,11 +235,13 @@ pub fn from_protobuf_value<'a>(
&map_desc.map_entry_key_field(),
&key.clone().into(),
map_type.key(),
messages_as_jsonb,
)?);
value_builder.append(from_protobuf_value(
&map_desc.map_entry_value_field(),
value,
map_type.value(),
messages_as_jsonb,
)?);
}
let keys = key_builder.finish();
Expand All @@ -231,6 +259,7 @@ pub fn from_protobuf_value<'a>(
fn protobuf_type_mapping(
field_descriptor: &FieldDescriptor,
parse_trace: &mut Vec<String>,
messages_as_jsonb: &HashSet<String>,
) -> std::result::Result<DataType, ProtobufTypeError> {
detect_loop_and_push(parse_trace, field_descriptor)?;
let mut t = match field_descriptor.kind() {
Expand All @@ -245,20 +274,33 @@ fn protobuf_type_mapping(
Kind::Uint64 | Kind::Fixed64 => DataType::Decimal,
Kind::String => DataType::Varchar,
Kind::Message(m) => {
if m.full_name() == "google.protobuf.Any" {
if messages_as_jsonb.contains(m.full_name()) {
// Well-Known Types are identified by their full name
DataType::Jsonb
} else if m.is_map_entry() {
// Map is equivalent to `repeated MapFieldEntry map_field = N;`
debug_assert!(field_descriptor.is_map());
let key = protobuf_type_mapping(&m.map_entry_key_field(), parse_trace)?;
let value = protobuf_type_mapping(&m.map_entry_value_field(), parse_trace)?;
let key = protobuf_type_mapping(
&m.map_entry_key_field(),
parse_trace,
messages_as_jsonb,
)?;
let value = protobuf_type_mapping(
&m.map_entry_value_field(),
parse_trace,
messages_as_jsonb,
)?;
_ = parse_trace.pop();
return Ok(DataType::Map(MapType::from_kv(key, value)));
} else {
let fields = m
.fields()
.map(|f| Ok((f.name().to_owned(), protobuf_type_mapping(&f, parse_trace)?)))
.map(|f| {
Ok((
f.name().to_owned(),
protobuf_type_mapping(&f, parse_trace, messages_as_jsonb)?,
))
})
.try_collect::<_, Vec<_>, _>()?;
StructType::new(fields).into()
}
Expand Down
Loading

0 comments on commit b364088

Please sign in to comment.