From 929a27d8c80cc5227dda565bcdccf71d98729cb5 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 18 Apr 2024 20:46:11 -0400 Subject: [PATCH] fix(cdc): align special values of numeric type in postgres-cdc --- e2e_test/source/cdc/cdc.share_stream.slt | 40 ++++++++++++++++++++++-- src/common/src/types/decimal.rs | 27 ++++++++++++++-- src/connector/src/parser/postgres.rs | 8 ++--- src/connector/src/parser/unified/json.rs | 14 ++++++--- 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index b9f5ef01c49ec..96247f52f0997 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -283,6 +283,13 @@ CREATE TABLE numeric_to_varchar_shared ( PRIMARY KEY (id) ) FROM pg_source TABLE 'public.numeric_table'; +statement ok +CREATE TABLE numeric_to_numeric_shared ( + id int, + num numeric, + PRIMARY KEY (id) +) FROM pg_source TABLE 'public.numeric_table'; + statement ok CREATE TABLE numeric_list_to_rw_int256_list_shared ( id int, @@ -297,6 +304,13 @@ CREATE TABLE numeric_list_to_varchar_list_shared ( PRIMARY KEY (id) ) FROM pg_source TABLE 'public.numeric_list'; +statement ok +CREATE TABLE numeric_list_to_numeric_list_shared ( + id int, + num numeric[], + PRIMARY KEY (id) +) FROM pg_source TABLE 'public.numeric_list'; + statement ok CREATE TABLE enum_to_varchar_shared ( id int, @@ -367,6 +381,23 @@ select * from numeric_to_rw_int256_shared order by id; 106 NULL 107 NULL +query II +select * from numeric_to_numeric_shared order by id; +---- +1 3.14 +2 NULL +3 NULL +4 NULL +5 NULL +6 NaN +7 Infinity +102 NULL +103 NULL +104 NULL +105 NULL +106 NaN +107 Infinity + system ok psql -c " DELETE FROM numeric_table WHERE id IN (102, 103, 104, 105, 106, 107); @@ -384,6 +415,12 @@ select * from numeric_list_to_rw_int256_list_shared order by id; 1 {NULL,6,57896044618658097711785492504343953926634992332820282019728792003956564819967,NULL,NULL} 2 {NULL,NULL,NULL} +query II +select * from numeric_list_to_numeric_list_shared order by id; +---- +1 {NULL,6,NULL,NULL,NULL} +2 {NaN,Infinity,-Infinity} + query II select * from enum_to_varchar_shared order by id; ---- @@ -396,12 +433,11 @@ select id, my_int from list_with_null_shared order by id; 1 {1,2,NULL} 2 {NULL,-1,-2} -# will fix in https://github.com/risingwavelabs/risingwave/pull/16416 query II select id, my_num from list_with_null_shared order by id; ---- 1 {1.1,POSITIVE_INFINITY,NULL} -2 NULL +2 {NULL,NAN,NEGATIVE_INFINITY} query II select id, my_mood from list_with_null_shared order by id; diff --git a/src/common/src/types/decimal.rs b/src/common/src/types/decimal.rs index b6031a1665540..b38dbb4822e68 100644 --- a/src/common/src/types/decimal.rs +++ b/src/common/src/types/decimal.rs @@ -13,14 +13,15 @@ // limitations under the License. use std::fmt::Debug; -use std::io::{Read, Write}; +use std::io::{Cursor, Read, Write}; use std::ops::{Add, Div, Mul, Neg, Rem, Sub}; +use byteorder::{BigEndian, ReadBytesExt}; use bytes::{BufMut, Bytes, BytesMut}; use num_traits::{ CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero, }; -use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use risingwave_common_estimate_size::ZeroHeapSize; use rust_decimal::prelude::FromStr; use rust_decimal::{Decimal as RustDecimal, Error, MathematicalOps as _, RoundingStrategy}; @@ -145,6 +146,28 @@ impl ToSql for Decimal { } } +impl<'a> FromSql<'a> for Decimal { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> Result> { + let mut rdr = Cursor::new(raw); + let _n_digits = rdr.read_u16::()?; + let _weight = rdr.read_i16::()?; + let sign = rdr.read_u16::()?; + match sign { + 0xC000 => Ok(Self::NaN), + 0xD000 => Ok(Self::PositiveInf), + 0xF000 => Ok(Self::NegativeInf), + _ => RustDecimal::from_sql(ty, raw).map(Self::Normalized), + } + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::NUMERIC) + } +} + macro_rules! impl_convert_int { ($T:ty) => { impl core::convert::From<$T> for Decimal { diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index c5c9762bad1da..db20f244b46e1 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -25,7 +25,6 @@ use risingwave_common::types::{ DataType, Date, Decimal, Int256, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp, Timestamptz, }; -use rust_decimal::Decimal as RustDecimal; use thiserror_ext::AsReport; use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type}; @@ -114,7 +113,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O handle_data_type!(row, i, name, f64) } DataType::Decimal => { - handle_data_type!(row, i, name, RustDecimal, Decimal) + handle_data_type!(row, i, name, Decimal) } DataType::Int256 => { // Currently in order to handle the decimal beyond RustDecimal, @@ -246,7 +245,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O handle_list_data_type!(row, i, name, f64, builder); } DataType::Decimal => { - handle_list_data_type!(row, i, name, RustDecimal, builder, Decimal); + handle_list_data_type!(row, i, name, Decimal, builder); } DataType::Date => { handle_list_data_type!(row, i, name, NaiveDate, builder, Date); @@ -415,7 +414,8 @@ fn pg_numeric_to_varchar(val: Option) -> Option { fn pg_numeric_to_string(val: Option) -> Option { if let Some(pg_numeric) = val { - // TODO(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN + // FIXME(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN + // https://github.com/risingwavelabs/risingwave/issues/16395 // The current implementation is to ensure consistency with the behavior of cdc event parsor. match pg_numeric { PgNumeric::NegativeInf => Some(String::from("NEGATIVE_INFINITY")), diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index d9deb73d582d9..11c569832268e 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -385,10 +385,16 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - - (Some(DataType::Decimal), ValueType::String) => ScalarImpl::Decimal( - Decimal::from_str(value.as_str().unwrap()).map_err(|_err| create_error())?, - ), + (Some(DataType::Decimal), ValueType::String) => { + let str = value.as_str().unwrap(); + // the following values are special string generated by Debezium and should be handled separately + match str { + "NAN" => ScalarImpl::Decimal(Decimal::NaN), + "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf), + "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf), + _ => ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?), + } + } (Some(DataType::Decimal), ValueType::Object) => { // ref https://github.com/risingwavelabs/risingwave/issues/10628 // handle debezium json (variable scale): {"scale": int, "value": bytes}