Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): Bump simd-json from 0.10.6 to 0.11.1 & fix error message #12550

Merged
merged 3 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
simd-json = "0.11.1"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
Expand Down
106 changes: 35 additions & 71 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,78 +460,42 @@ mod tests {
let mut parser = build_parser(columns.clone(), Default::default()).await;

let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2);
// i64 overflow
let data0 = br#"{"payload":{"before":null,"after":{"O_KEY":9223372036854775808,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data0.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected fail");
}
// bool incorrect value
let data1 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":2,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data1.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected failed");
}
// i16 overflow
let data2 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":32768,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data2.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
}
// i32 overflow
let data3 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":2147483648,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data3.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
}
// float32 overflow
let data4 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":3.80282347E38,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data4.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
}
// float64 will cause debezium simd_json_parser to panic, therefore included in the next
// test case below
}

#[tokio::test]
#[should_panic]
Comment on lines -513 to -518
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't panic anymore in 0.11

async fn test2_debezium_json_parser_overflow_f64() {
let columns = vec![SourceColumnDesc::simple(
"O_DOUBLE",
DataType::Float64,
ColumnId::from(0),
)];
let mut parser = build_parser(columns.clone(), Default::default()).await;
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2);
let data = br#"{"payload":{"before":null,"after":{"O_DOUBLE":1.797695E308},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678174483000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":563,"row":0,"thread":3,"query":null},"op":"c","ts_ms":1678174483866,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
let overflow_values = [
"9223372036854775808",
"2",
"32768",
"2147483648",
"3.80282347E38",
"1.797695E308",
];

for i in 0..6 {
let mut values = normal_values;
values[i] = overflow_values[i];
let data = format!(
r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
values[0], values[1], values[2], values[3], values[4], values[5]
).as_bytes().to_vec();
let e = parser
.parse_inner(None, Some(data), builder.row_writer())
.await
.unwrap_err();
println!("{}", e);
if i < 5 {
// For other overflow, the parsing succeeds but the type conversion fails
assert!(
e.to_string().contains("AccessError: TypeError"),
"i={i}, actual error: {e}"
);
} else {
// For f64 overflow, the parsing fails
assert!(
e.to_string().contains("InvalidNumber"),
"i={i}, actual error: {e}"
);
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,14 @@ impl SourceStreamChunkRowWriter<'_> {
}

/// Transaction control message. Currently only used by Debezium messages.
#[derive(Debug)]
pub enum TransactionControl {
Begin { id: Box<str> },
Commit { id: Box<str> },
}

/// The result of parsing a message.
#[derive(Debug)]
pub enum ParseResult {
/// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`].
Rows,
Expand Down
51 changes: 25 additions & 26 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;
use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType};
use simd_json::{BorrowedValue, ValueAccess, ValueType};

use super::{Access, AccessError, AccessResult};
use crate::parser::common::json_object_get_case_insensitive;
Expand Down Expand Up @@ -157,6 +157,7 @@ impl JsonParseOptions {
got: value.value_type().to_string(),
value: value.to_string(),
};

let v: ScalarImpl = match (type_expected, value.value_type()) {
(_, ValueType::Null) => return Ok(None),
// ---- Boolean -----
Expand Down Expand Up @@ -205,7 +206,7 @@ impl JsonParseOptions {
(
Some(DataType::Int16),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => value.try_as_i16()?.into(),
) => value.try_as_i16().map_err(|_| create_error())?.into(),

(Some(DataType::Int16), ValueType::String)
if matches!(
Expand All @@ -226,7 +227,7 @@ impl JsonParseOptions {
(
Some(DataType::Int32),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => value.try_as_i32()?.into(),
) => value.try_as_i32().map_err(|_| create_error())?.into(),

(Some(DataType::Int32), ValueType::String)
if matches!(
Expand All @@ -244,11 +245,13 @@ impl JsonParseOptions {
.into()
}
// ---- Int64 -----
(None, ValueType::I64 | ValueType::U64) => value.try_as_i64()?.into(),
(None, ValueType::I64 | ValueType::U64) => {
value.try_as_i64().map_err(|_| create_error())?.into()
}
(
Some(DataType::Int64),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => value.try_as_i64()?.into(),
) => value.try_as_i64().map_err(|_| create_error())?.into(),

(Some(DataType::Int64), ValueType::String)
if matches!(
Expand All @@ -270,7 +273,7 @@ impl JsonParseOptions {
Some(DataType::Float32),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
(value.try_as_i64()? as f32).into()
(value.try_as_i64().map_err(|_| create_error())? as f32).into()
}
(Some(DataType::Float32), ValueType::String)
if matches!(
Expand All @@ -287,13 +290,15 @@ impl JsonParseOptions {
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Float32), ValueType::F64) => value.try_as_f32()?.into(),
(Some(DataType::Float32), ValueType::F64) => {
value.try_as_f32().map_err(|_| create_error())?.into()
}
// ---- Float64 -----
(
Some(DataType::Float64),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
(value.try_as_i64()? as f64).into()
(value.try_as_i64().map_err(|_| create_error())? as f64).into()
}
(Some(DataType::Float64), ValueType::String)
if matches!(
Expand All @@ -310,20 +315,24 @@ impl JsonParseOptions {
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Float64) | None, ValueType::F64) => value.try_as_f64()?.into(),
(Some(DataType::Float64) | None, ValueType::F64) => {
value.try_as_f64().map_err(|_| create_error())?.into()
}
// ---- Decimal -----
(Some(DataType::Decimal) | None, ValueType::I128 | ValueType::U128) => {
Decimal::from_str(&value.try_as_i128()?.to_string())
Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Decimal), ValueType::I64 | ValueType::U64) => {
Decimal::from(value.try_as_i64()?).into()
Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
}

(Some(DataType::Decimal), ValueType::F64) => Decimal::try_from(value.try_as_f64()?)
.map_err(|_| create_error())?
.into(),
(Some(DataType::Decimal), ValueType::F64) => {
Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
.map_err(|_| create_error())?
.into()
}

(Some(DataType::Decimal), ValueType::String) => ScalarImpl::Decimal(
Decimal::from_str(value.as_str().unwrap()).map_err(|_err| create_error())?,
Expand Down Expand Up @@ -353,7 +362,7 @@ impl JsonParseOptions {
(
Some(DataType::Date),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => Date::with_days_since_unix_epoch(value.try_as_i32()?)
) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
.map_err(|_| create_error())?
.into(),
(Some(DataType::Date), ValueType::String) => value
Expand Down Expand Up @@ -526,7 +535,7 @@ impl JsonParseOptions {
(
Some(DataType::Int256),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => Int256::from(value.try_as_i64()?).into(),
) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),

(Some(DataType::Int256), ValueType::String) => {
Int256::from_str(value.as_str().unwrap())
Expand Down Expand Up @@ -579,13 +588,3 @@ where
self.options.parse(value, type_expected)
}
}

impl From<TryTypeError> for AccessError {
fn from(value: TryTypeError) -> Self {
AccessError::TypeError {
expected: value.expected.to_string(),
got: value.expected.to_string(),
value: Default::default(),
}
}
}
Loading