From 8f9c0637706ccb7686654fea94c4178edd105062 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Wed, 6 Mar 2024 11:24:00 +0800 Subject: [PATCH] fix: parsing the header of confluent protobuf message incorrectly (#15444) --- src/connector/src/parser/protobuf/parser.rs | 90 ++++++++++++++++++- .../src/schema/schema_registry/util.rs | 2 + 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 4248fa2b7470c..b09cc24ea59e1 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -36,7 +36,7 @@ use crate::parser::unified::{ use crate::parser::util::bytes_from_url; use crate::parser::{AccessBuilder, EncodingProperties}; use crate::schema::schema_registry::{ - extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, + extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, WireFormatError, }; #[derive(Debug)] @@ -524,6 +524,35 @@ fn protobuf_type_mapping( Ok(t) } +/// A port from the implementation of confluent's Varint Zig-zag deserialization. +/// See `ReadVarint` in +fn decode_varint_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { + // We expect the decoded number to be 4 bytes. + let mut value = 0u32; + let mut shift = 0; + let mut len = 0usize; + + for &byte in buffer { + len += 1; + // The Varint encoding is limited to 5 bytes. + if len > 5 { + break; + } + // The byte is cast to u32 to avoid shifting overflow. + let byte_ext = byte as u32; + // In Varint encoding, the lowest 7 bits are used to represent number, + // while the highest zero bit indicates the end of the number with Varint encoding. + value |= (byte_ext & 0x7F) << shift; + if byte_ext & 0x80 == 0 { + return Ok((((value >> 1) as i32) ^ -((value & 1) as i32), len)); + } + + shift += 7; + } + + Err(WireFormatError::ParseMessageIndexes.into()) +} + /// Reference: /// Wire format for Confluent pb header is: /// | 0 | 1-4 | 5-x | x+1-end @@ -531,14 +560,19 @@ fn protobuf_type_mapping( pub(crate) fn resolve_pb_header(payload: &[u8]) -> ConnectorResult<&[u8]> { // there's a message index array at the front of payload // if it is the first message in proto def, the array is just and `0` - // TODO: support parsing more complex index array let (_, remained) = extract_schema_id(payload)?; // The message indexes are encoded as int using variable-length zig-zag encoding, // prefixed by the length of the array. // Note that if the first byte is 0, it is equivalent to (1, 0) as an optimization. match remained.first() { Some(0) => Ok(&remained[1..]), - Some(i) => Ok(&remained[(*i as usize)..]), + Some(_) => { + let (index_len, mut offset) = decode_varint_zigzag(remained)?; + for _ in 0..index_len { + offset += decode_varint_zigzag(&remained[offset..])?.1; + } + Ok(&remained[offset..]) + } None => bail!("The proto payload is empty"), } } @@ -1106,4 +1140,54 @@ mod test { Ok(()) } + + #[test] + fn test_decode_varint_zigzag() { + // 1. Positive number + let buffer = vec![0x02]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, 1); + assert_eq!(len, 1); + + // 2. Negative number + let buffer = vec![0x01]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, -1); + assert_eq!(len, 1); + + // 3. Larger positive number + let buffer = vec![0x9E, 0x03]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, 207); + assert_eq!(len, 2); + + // 4. Larger negative number + let buffer = vec![0xBF, 0x07]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, -480); + assert_eq!(len, 2); + + // 5. Maximum positive number + let buffer = vec![0xFE, 0xFF, 0xFF, 0xFF, 0x0F]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MAX); + assert_eq!(len, 5); + + // 6. Maximum negative number + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x0F]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MIN); + assert_eq!(len, 5); + + // 7. More than 32 bits + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x7F]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MIN); + assert_eq!(len, 5); + + // 8. Invalid input (more than 5 bytes) + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]; + let result = decode_varint_zigzag(&buffer); + assert!(result.is_err()); + } } diff --git a/src/connector/src/schema/schema_registry/util.rs b/src/connector/src/schema/schema_registry/util.rs index 407534b1a5671..0d43f33baa31c 100644 --- a/src/connector/src/schema/schema_registry/util.rs +++ b/src/connector/src/schema/schema_registry/util.rs @@ -49,6 +49,8 @@ pub enum WireFormatError { NoMagic, #[error("fail to read 4-byte schema ID")] NoSchemaId, + #[error("failed to parse message indexes")] + ParseMessageIndexes, } /// extract the magic number and `schema_id` at the front of payload