Skip to content

Commit

Permalink
fix: parsing the header of confluent protobuf message incorrectly (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 authored Mar 6, 2024
1 parent 086749f commit 8f9c063
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 3 deletions.
90 changes: 87 additions & 3 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::parser::unified::{
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, EncodingProperties};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, WireFormatError,
};

#[derive(Debug)]
Expand Down Expand Up @@ -524,21 +524,55 @@ fn protobuf_type_mapping(
Ok(t)
}

/// A port from the implementation of confluent's Varint Zig-zag deserialization.
/// See `ReadVarint` in <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java>
fn decode_varint_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> {
// We expect the decoded number to be 4 bytes.
let mut value = 0u32;
let mut shift = 0;
let mut len = 0usize;

for &byte in buffer {
len += 1;
// The Varint encoding is limited to 5 bytes.
if len > 5 {
break;
}
// The byte is cast to u32 to avoid shifting overflow.
let byte_ext = byte as u32;
// In Varint encoding, the lowest 7 bits are used to represent number,
// while the highest zero bit indicates the end of the number with Varint encoding.
value |= (byte_ext & 0x7F) << shift;
if byte_ext & 0x80 == 0 {
return Ok((((value >> 1) as i32) ^ -((value & 1) as i32), len));
}

shift += 7;
}

Err(WireFormatError::ParseMessageIndexes.into())
}

/// Reference: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
/// Wire format for Confluent pb header is:
/// | 0 | 1-4 | 5-x | x+1-end
/// | magic-byte | schema-id | message-indexes | protobuf-payload
pub(crate) fn resolve_pb_header(payload: &[u8]) -> ConnectorResult<&[u8]> {
// there's a message index array at the front of payload
// if it is the first message in proto def, the array is just and `0`
// TODO: support parsing more complex index array
let (_, remained) = extract_schema_id(payload)?;
// The message indexes are encoded as int using variable-length zig-zag encoding,
// prefixed by the length of the array.
// Note that if the first byte is 0, it is equivalent to (1, 0) as an optimization.
match remained.first() {
Some(0) => Ok(&remained[1..]),
Some(i) => Ok(&remained[(*i as usize)..]),
Some(_) => {
let (index_len, mut offset) = decode_varint_zigzag(remained)?;
for _ in 0..index_len {
offset += decode_varint_zigzag(&remained[offset..])?.1;
}
Ok(&remained[offset..])
}
None => bail!("The proto payload is empty"),
}
}
Expand Down Expand Up @@ -1106,4 +1140,54 @@ mod test {

Ok(())
}

#[test]
fn test_decode_varint_zigzag() {
// 1. Positive number
let buffer = vec![0x02];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, 1);
assert_eq!(len, 1);

// 2. Negative number
let buffer = vec![0x01];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, -1);
assert_eq!(len, 1);

// 3. Larger positive number
let buffer = vec![0x9E, 0x03];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, 207);
assert_eq!(len, 2);

// 4. Larger negative number
let buffer = vec![0xBF, 0x07];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, -480);
assert_eq!(len, 2);

// 5. Maximum positive number
let buffer = vec![0xFE, 0xFF, 0xFF, 0xFF, 0x0F];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, i32::MAX);
assert_eq!(len, 5);

// 6. Maximum negative number
let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x0F];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, i32::MIN);
assert_eq!(len, 5);

// 7. More than 32 bits
let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x7F];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, i32::MIN);
assert_eq!(len, 5);

// 8. Invalid input (more than 5 bytes)
let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF];
let result = decode_varint_zigzag(&buffer);
assert!(result.is_err());
}
}
2 changes: 2 additions & 0 deletions src/connector/src/schema/schema_registry/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub enum WireFormatError {
NoMagic,
#[error("fail to read 4-byte schema ID")]
NoSchemaId,
#[error("failed to parse message indexes")]
ParseMessageIndexes,
}

/// extract the magic number and `schema_id` at the front of payload
Expand Down

0 comments on commit 8f9c063

Please sign in to comment.