diff --git a/Cargo.lock b/Cargo.lock index 01d09a424b84..e6f67b697d3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8943,10 +8943,11 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.10.6" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de7f1293f0e4e11d52e588766fe9de8caa2857ff63809d40de83245452ca7c5c" +checksum = "474b451aaac1828ed12f6454a80fe58b940ae2998d10389d41533940a6f641bf" dependencies = [ + "getrandom", "halfbrown", "lexical-core", "serde", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 9c5d4804fec6..6e228e8c8891 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -102,7 +102,7 @@ serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } -simd-json = "0.10.6" +simd-json = "0.11.1" strum = "0.25" strum_macros = "0.25" tempfile = "3" diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 0bfd69a7bb6f..40befaf0f60e 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -460,78 +460,42 @@ mod tests { let mut parser = build_parser(columns.clone(), Default::default()).await; let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); - // i64 overflow - let data0 = br#"{"payload":{"before":null,"after":{"O_KEY":9223372036854775808,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; - if let Err(e) = parser - .parse_inner(None, Some(data0.to_vec()), builder.row_writer()) - .await - { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected fail"); - } - // bool incorrect value - let data1 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":2,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; - if let Err(e) = parser - .parse_inner(None, Some(data1.to_vec()), builder.row_writer()) - .await - { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected failed"); - } - // i16 overflow - let data2 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":32768,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; - if let Err(e) = parser - .parse_inner(None, Some(data2.to_vec()), builder.row_writer()) - .await - { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected to fail"); - } - // i32 overflow - let data3 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":2147483648,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; - if let Err(e) = parser - .parse_inner(None, Some(data3.to_vec()), builder.row_writer()) - .await - { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected to fail"); - } - // float32 overflow - let data4 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":3.80282347E38,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; - if let Err(e) = parser - .parse_inner(None, Some(data4.to_vec()), builder.row_writer()) - .await - { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected to fail"); - } - // float64 will cause debezium simd_json_parser to panic, therefore included in the next - // test case below - } - #[tokio::test] - #[should_panic] - async fn test2_debezium_json_parser_overflow_f64() { - let columns = vec![SourceColumnDesc::simple( - "O_DOUBLE", - DataType::Float64, - ColumnId::from(0), - )]; - let mut parser = build_parser(columns.clone(), Default::default()).await; - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); - let data = br#"{"payload":{"before":null,"after":{"O_DOUBLE":1.797695E308},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678174483000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":563,"row":0,"thread":3,"query":null},"op":"c","ts_ms":1678174483866,"transaction":null}}"#; - if let Err(e) = parser - .parse_inner(None, Some(data.to_vec()), builder.row_writer()) - .await - { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected to fail"); + let normal_values = ["111", "1", "33", "444", "555.0", "666.0"]; + let overflow_values = [ + "9223372036854775808", + "2", + "32768", + "2147483648", + "3.80282347E38", + "1.797695E308", + ]; + + for i in 0..6 { + let mut values = normal_values; + values[i] = overflow_values[i]; + let data = format!( + r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#, + values[0], values[1], values[2], values[3], values[4], values[5] + ).as_bytes().to_vec(); + let e = parser + .parse_inner(None, Some(data), builder.row_writer()) + .await + .unwrap_err(); + println!("{}", e); + if i < 5 { + // For other overflow, the parsing succeeds but the type conversion fails + assert!( + e.to_string().contains("AccessError: TypeError"), + "i={i}, actual error: {e}" + ); + } else { + // For f64 overflow, the parsing fails + assert!( + e.to_string().contains("InvalidNumber"), + "i={i}, actual error: {e}" + ); + } } } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 8c7914fe1635..29a1ecd63c65 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -356,12 +356,14 @@ impl SourceStreamChunkRowWriter<'_> { } /// Transaction control message. Currently only used by Debezium messages. +#[derive(Debug)] pub enum TransactionControl { Begin { id: Box }, Commit { id: Box }, } /// The result of parsing a message. +#[derive(Debug)] pub enum ParseResult { /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`]. Rows, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 433b7e5f7430..79590749351d 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -23,7 +23,7 @@ use risingwave_common::types::{ DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, }; use risingwave_common::util::iter_util::ZipEqFast; -use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType}; +use simd_json::{BorrowedValue, ValueAccess, ValueType}; use super::{Access, AccessError, AccessResult}; use crate::parser::common::json_object_get_case_insensitive; @@ -157,6 +157,7 @@ impl JsonParseOptions { got: value.value_type().to_string(), value: value.to_string(), }; + let v: ScalarImpl = match (type_expected, value.value_type()) { (_, ValueType::Null) => return Ok(None), // ---- Boolean ----- @@ -205,7 +206,7 @@ impl JsonParseOptions { ( Some(DataType::Int16), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => value.try_as_i16()?.into(), + ) => value.try_as_i16().map_err(|_| create_error())?.into(), (Some(DataType::Int16), ValueType::String) if matches!( @@ -226,7 +227,7 @@ impl JsonParseOptions { ( Some(DataType::Int32), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => value.try_as_i32()?.into(), + ) => value.try_as_i32().map_err(|_| create_error())?.into(), (Some(DataType::Int32), ValueType::String) if matches!( @@ -244,11 +245,13 @@ impl JsonParseOptions { .into() } // ---- Int64 ----- - (None, ValueType::I64 | ValueType::U64) => value.try_as_i64()?.into(), + (None, ValueType::I64 | ValueType::U64) => { + value.try_as_i64().map_err(|_| create_error())?.into() + } ( Some(DataType::Int64), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => value.try_as_i64()?.into(), + ) => value.try_as_i64().map_err(|_| create_error())?.into(), (Some(DataType::Int64), ValueType::String) if matches!( @@ -270,7 +273,7 @@ impl JsonParseOptions { Some(DataType::Float32), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => { - (value.try_as_i64()? as f32).into() + (value.try_as_i64().map_err(|_| create_error())? as f32).into() } (Some(DataType::Float32), ValueType::String) if matches!( @@ -287,13 +290,15 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - (Some(DataType::Float32), ValueType::F64) => value.try_as_f32()?.into(), + (Some(DataType::Float32), ValueType::F64) => { + value.try_as_f32().map_err(|_| create_error())?.into() + } // ---- Float64 ----- ( Some(DataType::Float64), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => { - (value.try_as_i64()? as f64).into() + (value.try_as_i64().map_err(|_| create_error())? as f64).into() } (Some(DataType::Float64), ValueType::String) if matches!( @@ -310,20 +315,24 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - (Some(DataType::Float64) | None, ValueType::F64) => value.try_as_f64()?.into(), + (Some(DataType::Float64) | None, ValueType::F64) => { + value.try_as_f64().map_err(|_| create_error())?.into() + } // ---- Decimal ----- (Some(DataType::Decimal) | None, ValueType::I128 | ValueType::U128) => { - Decimal::from_str(&value.try_as_i128()?.to_string()) + Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string()) .map_err(|_| create_error())? .into() } (Some(DataType::Decimal), ValueType::I64 | ValueType::U64) => { - Decimal::from(value.try_as_i64()?).into() + Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into() } - (Some(DataType::Decimal), ValueType::F64) => Decimal::try_from(value.try_as_f64()?) - .map_err(|_| create_error())? - .into(), + (Some(DataType::Decimal), ValueType::F64) => { + Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?) + .map_err(|_| create_error())? + .into() + } (Some(DataType::Decimal), ValueType::String) => ScalarImpl::Decimal( Decimal::from_str(value.as_str().unwrap()).map_err(|_err| create_error())?, @@ -353,7 +362,7 @@ impl JsonParseOptions { ( Some(DataType::Date), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => Date::with_days_since_unix_epoch(value.try_as_i32()?) + ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?) .map_err(|_| create_error())? .into(), (Some(DataType::Date), ValueType::String) => value @@ -526,7 +535,7 @@ impl JsonParseOptions { ( Some(DataType::Int256), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => Int256::from(value.try_as_i64()?).into(), + ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(), (Some(DataType::Int256), ValueType::String) => { Int256::from_str(value.as_str().unwrap()) @@ -579,13 +588,3 @@ where self.options.parse(value, type_expected) } } - -impl From for AccessError { - fn from(value: TryTypeError) -> Self { - AccessError::TypeError { - expected: value.expected.to_string(), - got: value.expected.to_string(), - value: Default::default(), - } - } -}