diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 20d8a0997a7f..812212672729 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -385,6 +385,10 @@ fn encode_field( AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?, _ => return no_match_err(), }, + DataType::Serial => match inner { + AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?, + _ => return no_match_err(), + }, DataType::Struct(st) => match inner { AvroSchema::Record { .. } => maybe.on_struct(st, inner)?, _ => return no_match_err(), @@ -447,7 +451,7 @@ fn encode_field( DataType::Decimal => return no_match_err(), DataType::Jsonb => return no_match_err(), // Group D: unsupported - DataType::Serial | DataType::Int256 => { + DataType::Int256 => { return no_match_err(); } }; @@ -531,6 +535,13 @@ mod tests { Value::Long(i64::MAX), ); + test_ok( + &DataType::Serial, + Some(ScalarImpl::Serial(i64::MAX.into())), + r#""long""#, + Value::Long(i64::MAX), + ); + let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap(); test_ok( &DataType::Timestamptz, diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 2ee8bfcc0d27..e1ce9e61b6a1 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -209,6 +209,10 @@ fn datum_to_json_object( (DataType::Int64, ScalarRefImpl::Int64(v)) => { json!(v) } + (DataType::Serial, ScalarRefImpl::Serial(v)) => { + // The serial type needs to be handled as a string to prevent primary key conflicts caused by the precision issues of JSON numbers. + json!(format!("{:#018x}", v.into_inner())) + } (DataType::Float32, ScalarRefImpl::Float32(v)) => { json!(f32::from(v)) } @@ -402,7 +406,7 @@ pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str { DataType::List(_) => "array", DataType::Bytea => "bytes", DataType::Jsonb => "string", - DataType::Serial => "int32", + DataType::Serial => "string", DataType::Int256 => "string", } } @@ -434,13 +438,13 @@ fn type_as_json_schema(rw_type: &DataType) -> Map { #[cfg(test)] mod tests { - use risingwave_common::types::{ Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time, Timestamp, }; use super::*; + #[test] fn test_to_json_basic_type() { let mock_field = Field { @@ -498,6 +502,24 @@ mod tests { i64::MAX.to_string() ); + let serial_value = datum_to_json_object( + &Field { + data_type: DataType::Serial, + ..mock_field.clone() + }, + Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()), + DateHandlingMode::FromCe, + TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, + &CustomJsonType::None, + ) + .unwrap(); + assert_eq!( + serde_json::to_string(&serial_value).unwrap(), + format!("\"{:#018x}\"", i64::MAX) + ); + // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap(); let tstz_value = datum_to_json_object( @@ -820,7 +842,7 @@ mod tests { let schema = json_converter_with_schema(json!({}), "test".to_owned(), fields.iter()) ["schema"] .to_string(); - let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"int32"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#; + let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#; assert_eq!(schema, ans); } }