Skip to content

Commit

Permalink
feat: support more type for json parser in source
Browse files Browse the repository at this point in the history
* add date support for json parser

* format code & add ut for date typr in json parser

* format code

* fix clippy check

* fix build fail

* Result<T, E>::ok() to convert Result to Option

Co-authored-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion and tabVersion authored Dec 29, 2021
1 parent 8f200aa commit a225fc8
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 30 deletions.
85 changes: 60 additions & 25 deletions rust/source/src/parser/common.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,72 @@
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataTypeKind, Decimal, ScalarImpl, ScalarRef};
use risingwave_common::vector_op::cast::str_to_date;
use serde_json::Value;

use crate::SourceColumnDesc;

macro_rules! make_ScalarImpl {
($x:expr, $y:expr) => {
match $x {
Some(v) => return Ok($y(v)),
None => return Err(RwError::from(InternalError("json parse error".to_string()))),
}
};
}

pub(crate) fn json_parse_value(
column: &SourceColumnDesc,
value: Option<&Value>,
) -> Option<ScalarImpl> {
) -> Result<ScalarImpl> {
match column.data_type.data_type_kind() {
DataTypeKind::Boolean => value
.and_then(|v| v.as_bool())
.map(|v| ScalarImpl::Bool(v as bool)),
DataTypeKind::Int16 => value
.and_then(|v| v.as_i64())
.map(|v| ScalarImpl::Int16(v as i16)),
DataTypeKind::Int32 => value
.and_then(|v| v.as_i64())
.map(|v| ScalarImpl::Int32(v as i32)),
DataTypeKind::Int64 => value
.and_then(|v| v.as_i64())
.map(|v| ScalarImpl::Int64(v as i64)),
DataTypeKind::Float32 => value
.and_then(|v| v.as_f64())
.map(|v| ScalarImpl::Float32((v as f32).into())),
DataTypeKind::Float64 => value
.and_then(|v| v.as_f64())
.map(|v| ScalarImpl::Float64(v.into())),
DataTypeKind::Decimal => value
.and_then(|v| v.as_u64())
.map(|v| ScalarImpl::Decimal(Decimal::from(v))),
DataTypeKind::Char | DataTypeKind::Varchar => value
.and_then(|v| v.as_str())
.map(|v| ScalarImpl::Utf8(v.to_owned_scalar())),
DataTypeKind::Boolean => {
make_ScalarImpl!(value.and_then(|v| v.as_bool()), |x| ScalarImpl::Bool(
x as bool
))
}
DataTypeKind::Int16 => {
make_ScalarImpl!(value.and_then(|v| v.as_i64()), |x| ScalarImpl::Int16(
x as i16
))
}
DataTypeKind::Int32 => {
make_ScalarImpl!(value.and_then(|v| v.as_i64()), |x| ScalarImpl::Int32(
x as i32
))
}
DataTypeKind::Int64 => {
make_ScalarImpl!(value.and_then(|v| v.as_i64()), |x| ScalarImpl::Int64(
x as i64
))
}
DataTypeKind::Float32 => {
make_ScalarImpl!(value.and_then(|v| v.as_f64()), |v| ScalarImpl::Float32(
(v as f32).into()
))
}
DataTypeKind::Float64 => {
make_ScalarImpl!(
value.and_then(|v| v.as_f64()),
|v: f64| ScalarImpl::Float64(v.into())
)
}
DataTypeKind::Decimal => {
make_ScalarImpl!(value.and_then(|v| v.as_u64()), |v| ScalarImpl::Decimal(
Decimal::from(v)
))
}
DataTypeKind::Char | DataTypeKind::Varchar => make_ScalarImpl!(
value.and_then(|v| v.as_str()),
|v: &str| ScalarImpl::Utf8(v.to_owned_scalar())
),
DataTypeKind::Date => match value.and_then(|v| v.as_str()) {
None => Err(RwError::from(InternalError("parse error".to_string()))),
Some(date_str) => match str_to_date(date_str) {
Ok(date) => Ok(ScalarImpl::Int32(date as i32)),
Err(e) => Err(e),
},
},
_ => unimplemented!(),
}
}
2 changes: 1 addition & 1 deletion rust/source/src/parser/debezium/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl DebeziumJsonParser {
if column.skip_parse {
None
} else {
json_parse_value(column, map.get(&column.name))
json_parse_value(column, map.get(&column.name)).ok()
}
})
.collect::<Vec<Datum>>()
Expand Down
17 changes: 13 additions & 4 deletions rust/source/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl SourceParser for JSONParser {
if column.skip_parse {
None
} else {
json_parse_value(column, value.get(&column.name))
json_parse_value(column, value.get(&column.name)).ok()
}
})
.collect::<Vec<Datum>>()],
Expand All @@ -35,17 +35,18 @@ impl SourceParser for JSONParser {
#[cfg(test)]
mod tests {
use risingwave_common::types::{
BoolType, DataTypeKind, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
ScalarImpl, StringType,
BoolType, DataTypeKind, DateType, Float32Type, Float64Type, Int16Type, Int32Type,
Int64Type, ScalarImpl, StringType,
};
use risingwave_common::vector_op::cast::str_to_date;

use crate::{JSONParser, SourceColumnDesc, SourceParser};

#[test]
fn test_json_parser() {
let parser = JSONParser {};

let payload = r#"{"i32":1,"char":"char","bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar"}"#.as_bytes();
let payload = r#"{"i32":1,"char":"char","bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01"}"#.as_bytes();
let descs = vec![
SourceColumnDesc {
name: "i32".to_string(),
Expand Down Expand Up @@ -103,6 +104,13 @@ mod tests {
skip_parse: false,
is_primary: false,
},
SourceColumnDesc {
name: "date".to_string(),
data_type: DateType::create(false),
column_id: 8,
skip_parse: false,
is_primary: false,
},
];

let result = parser.parse(payload, &descs);
Expand All @@ -118,6 +126,7 @@ mod tests {
assert!(row[5].eq(&Some(ScalarImpl::Float32(1.23.into()))));
assert!(row[6].eq(&Some(ScalarImpl::Float64(1.2345.into()))));
assert!(row[7].eq(&Some(ScalarImpl::Utf8("varchar".to_string()))));
assert!(row[8].eq(&Some(ScalarImpl::Int32(str_to_date("2021-01-01").unwrap()))));

let payload = r#"{"i32":1}"#.as_bytes();
let result = parser.parse(payload, &descs);
Expand Down

0 comments on commit a225fc8

Please sign in to comment.