Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: detect cycle ref in proto #10499

Merged
merged 6 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ impl ProtobufParserConfig {
pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
let mut columns = Vec::with_capacity(self.message_descriptor.fields().len());
let mut index = 0;
let mut parse_trace: Vec<String> = vec![];
for field in self.message_descriptor.fields() {
columns.push(Self::pb_field_to_col_desc(&field, &mut index)?);
columns.push(Self::pb_field_to_col_desc(
&field,
&mut index,
&mut parse_trace,
)?);
}

Ok(columns)
Expand All @@ -134,14 +139,15 @@ impl ProtobufParserConfig {
fn pb_field_to_col_desc(
field_descriptor: &FieldDescriptor,
index: &mut i32,
parse_trace: &mut Vec<String>,
) -> Result<ColumnDesc> {
let field_type = protobuf_type_mapping(field_descriptor)?;
let field_type = protobuf_type_mapping(field_descriptor, parse_trace)?;
if let Kind::Message(m) = field_descriptor.kind() {
let field_descs = if let DataType::List { .. } = field_type {
vec![]
} else {
m.fields()
.map(|f| Self::pb_field_to_col_desc(&f, index))
.map(|f| Self::pb_field_to_col_desc(&f, index, parse_trace))
.collect::<Result<Vec<_>>>()?
};
*index += 1;
Expand Down Expand Up @@ -219,6 +225,20 @@ impl ProtobufParser {
}
}

fn detect_loop_and_push(trace: &mut Vec<String>, fd: &FieldDescriptor) -> Result<()> {
let identifier = format!("{}({})", fd.name(), fd.full_name());
if trace.iter().any(|s| s == identifier.as_str()) {
return Err(RwError::from(ProtocolError(format!(
"circular reference detected: {}, conflict with {}, kind {:?}",
trace.iter().join("->"),
identifier,
fd.kind(),
))));
}
trace.push(identifier);
Ok(())
}

Comment on lines +228 to +241
Copy link
Contributor

@neverchanje neverchanje Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's cool to use an O(N^2) algorithm in this case. But I am worrying if it's possible for users to import a 1k-fields message in the system, which may cause the memory usage suddenly increases.

Given that all the previous types before the current DFS point should be unique, maybe you can use a set instead of a vector:

fn detect_loop_and_push(visited: &mut HashSet<String>, fd: &FieldDescriptor) -> Result<()> {
  if visited.contains(fd.name()) {
     return Err(...)
  }
  visited.insert(fd.name());
}

fn protobuf_type_mapping(
    field_descriptor: &FieldDescriptor,
    parse_trace: &mut HashSet<String>,
) -> Result<DataType> {
    detect_loop_and_push(parse_trace, field_descriptor)?;

    ...

    parse_trace.erase(field_descriptor.name())
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use a vec displaying the parser visiting path in order to help to identify the recursive ref occurs at which level.
push and pop op always happen at the end of the vec, there should be little overhead for allocation

impl ByteStreamSourceParser for ProtobufParser {
fn columns(&self) -> &[SourceColumnDesc] {
&self.rw_columns
Expand Down Expand Up @@ -302,7 +322,11 @@ fn from_protobuf_value(field_desc: &FieldDescriptor, value: &Value) -> Result<Da
}

/// Maps protobuf type to RW type.
fn protobuf_type_mapping(field_descriptor: &FieldDescriptor) -> Result<DataType> {
fn protobuf_type_mapping(
field_descriptor: &FieldDescriptor,
parse_trace: &mut Vec<String>,
) -> Result<DataType> {
detect_loop_and_push(parse_trace, field_descriptor)?;
let field_type = field_descriptor.kind();
let mut t = match field_type {
Kind::Bool => DataType::Boolean,
Expand All @@ -317,7 +341,7 @@ fn protobuf_type_mapping(field_descriptor: &FieldDescriptor) -> Result<DataType>
Kind::Message(m) => {
let fields = m
.fields()
.map(|f| protobuf_type_mapping(&f))
.map(|f| protobuf_type_mapping(&f, parse_trace))
.collect::<Result<Vec<_>>>()?;
let field_names = m.fields().map(|f| f.name().to_string()).collect_vec();
DataType::new_struct(fields, field_names)
Expand All @@ -334,6 +358,7 @@ fn protobuf_type_mapping(field_descriptor: &FieldDescriptor) -> Result<DataType>
if field_descriptor.cardinality() == Cardinality::Repeated {
t = DataType::List(Box::new(t))
}
_ = parse_trace.pop();
Ok(t)
}

Expand Down Expand Up @@ -456,4 +481,21 @@ mod test {
);
Ok(())
}

#[tokio::test]
async fn test_refuse_recursive_proto_message() {
let location = schema_dir() + "/proto_recursive/recursive.pb";
let message_name = "recursive.ComplexRecursiveMessage";
let conf = ProtobufParserConfig::new(&HashMap::new(), &location, message_name, false)
.await
.unwrap();
let columns = conf.map_to_columns();
// expect error message:
// "Err(Protocol error: circular reference detected:
// parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.
// ComplexRecursiveMessage.Parent.siblings), conflict with
// parent(recursive.ComplexRecursiveMessage.parent), kind
// recursive.ComplexRecursiveMessage.Parent"
assert!(columns.is_err());
}
}
20 changes: 20 additions & 0 deletions src/connector/src/test_data/proto_recursive/recursive.pb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

�
recursive.proto recursive"�
ComplexRecursiveMessage
node_name ( RnodeName
node_id (RnodeIdM
Comment on lines +2 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the compiled result? Might be better to git ignore it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the proto dep requires the compiled ver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean can we compile it when testing? Seem we don't have any other test that depends on the hard coded compile output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, not a big deal. We can always handle it later.

Copy link
Contributor Author

@tabVersion tabVersion Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean can we compile it when testing? Seem we don't have any other test that depends on the hard coded compile output.

There are only a few generated protobuf binary, I think it is not worthy introducing extra scripts and toolchain.


attributes ( 2-.recursive.ComplexRecursiveMessage.AttributesR
attributesA
parent ( 2).recursive.ComplexRecursiveMessage.ParentRparent>
children ( 2".recursive.ComplexRecursiveMessageRchildren4

Attributes
key ( Rkey
value ( Rvalue�
Parent
parent_name ( R
parentName
parent_id (RparentId>
siblings ( 2".recursive.ComplexRecursiveMessageRsiblingsbproto3
25 changes: 25 additions & 0 deletions src/connector/src/test_data/proto_recursive/recursive.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

package recursive;

message ComplexRecursiveMessage {
string node_name = 1;
int32 node_id = 2;

message Attributes {
string key = 1;
string value = 2;
}

repeated Attributes attributes = 3;

message Parent {
string parent_name = 1;
int32 parent_id = 2;
repeated ComplexRecursiveMessage siblings = 3;
}

Parent parent = 4;
Comment on lines +16 to +22
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a practical example? Seems we're storing the nodes multiple times since protobuf is by-value instead of by-ref. 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the example is generated by chatgpt. real world ones are welcomed

repeated ComplexRecursiveMessage children = 5;
}