Skip to content

Commit

Permalink
feat(sink): support serial type in json & avro encoder (#16969)
Browse files Browse the repository at this point in the history
Co-authored-by: Shanicky Chen <>
  • Loading branch information
shanicky authored May 29, 2024
1 parent bd6454e commit 234f657
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
13 changes: 12 additions & 1 deletion src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ fn encode_field<D: MaybeData>(
AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
_ => return no_match_err(),
},
DataType::Serial => match inner {
AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
_ => return no_match_err(),
},
DataType::Struct(st) => match inner {
AvroSchema::Record { .. } => maybe.on_struct(st, inner)?,
_ => return no_match_err(),
Expand Down Expand Up @@ -447,7 +451,7 @@ fn encode_field<D: MaybeData>(
DataType::Decimal => return no_match_err(),
DataType::Jsonb => return no_match_err(),
// Group D: unsupported
DataType::Serial | DataType::Int256 => {
DataType::Int256 => {
return no_match_err();
}
};
Expand Down Expand Up @@ -531,6 +535,13 @@ mod tests {
Value::Long(i64::MAX),
);

test_ok(
&DataType::Serial,
Some(ScalarImpl::Serial(i64::MAX.into())),
r#""long""#,
Value::Long(i64::MAX),
);

let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
test_ok(
&DataType::Timestamptz,
Expand Down
28 changes: 25 additions & 3 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ fn datum_to_json_object(
(DataType::Int64, ScalarRefImpl::Int64(v)) => {
json!(v)
}
(DataType::Serial, ScalarRefImpl::Serial(v)) => {
// The serial type needs to be handled as a string to prevent primary key conflicts caused by the precision issues of JSON numbers.
json!(format!("{:#018x}", v.into_inner()))
}
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
json!(f32::from(v))
}
Expand Down Expand Up @@ -402,7 +406,7 @@ pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
DataType::List(_) => "array",
DataType::Bytea => "bytes",
DataType::Jsonb => "string",
DataType::Serial => "int32",
DataType::Serial => "string",
DataType::Int256 => "string",
}
}
Expand Down Expand Up @@ -434,13 +438,13 @@ fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {

#[cfg(test)]
mod tests {

use risingwave_common::types::{
Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
Timestamp,
};

use super::*;

#[test]
fn test_to_json_basic_type() {
let mock_field = Field {
Expand Down Expand Up @@ -498,6 +502,24 @@ mod tests {
i64::MAX.to_string()
);

let serial_value = datum_to_json_object(
&Field {
data_type: DataType::Serial,
..mock_field.clone()
},
Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
DateHandlingMode::FromCe,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
&CustomJsonType::None,
)
.unwrap();
assert_eq!(
serde_json::to_string(&serial_value).unwrap(),
format!("\"{:#018x}\"", i64::MAX)
);

// https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
let tstz_value = datum_to_json_object(
Expand Down Expand Up @@ -820,7 +842,7 @@ mod tests {
let schema = json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())
["schema"]
.to_string();
let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"int32"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
assert_eq!(schema, ans);
}
}

0 comments on commit 234f657

Please sign in to comment.