Skip to content

Commit

Permalink
fix(cdc): align special values of numeric type in postgres-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Apr 23, 2024
1 parent 7e8b55e commit 9f4be46
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
37 changes: 37 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ CREATE TABLE numeric_to_varchar_shared (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_table';

statement ok
CREATE TABLE numeric_to_numeric_shared (
id int,
num numeric,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_table';

statement ok
CREATE TABLE numeric_list_to_rw_int256_list_shared (
id int,
Expand All @@ -297,6 +304,13 @@ CREATE TABLE numeric_list_to_varchar_list_shared (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_list';

statement ok
CREATE TABLE numeric_list_to_numeric_list_shared (
id int,
num numeric[],
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_list';

statement ok
CREATE TABLE enum_to_varchar_shared (
id int,
Expand Down Expand Up @@ -367,6 +381,23 @@ select * from numeric_to_rw_int256_shared order by id;
106 NULL
107 NULL

query II
select * from numeric_to_numeric_shared order by id;
----
1 3.14
2 NULL
3 NULL
4 NULL
5 NULL
6 NaN
7 Infinity
102 NULL
103 NULL
104 NULL
105 NULL
106 NaN
107 Infinity

system ok
psql -c "
DELETE FROM numeric_table WHERE id IN (102, 103, 104, 105, 106, 107);
Expand All @@ -384,6 +415,12 @@ select * from numeric_list_to_rw_int256_list_shared order by id;
1 {NULL,6,57896044618658097711785492504343953926634992332820282019728792003956564819967,NULL,NULL}
2 {NULL,NULL,NULL}

query II
select * from numeric_list_to_numeric_list_shared order by id;
----
1 {NULL,6,NULL,NULL,NULL}
2 {NaN,Infinity,-Infinity}

query II
select * from enum_to_varchar_shared order by id;
----
Expand Down
27 changes: 25 additions & 2 deletions src/common/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.

use std::fmt::Debug;
use std::io::{Read, Write};
use std::io::{Cursor, Read, Write};
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};

use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use num_traits::{
CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero,
};
use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type};
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::ZeroHeapSize;
use rust_decimal::prelude::FromStr;
use rust_decimal::{Decimal as RustDecimal, Error, MathematicalOps as _, RoundingStrategy};
Expand Down Expand Up @@ -145,6 +146,28 @@ impl ToSql for Decimal {
}
}

impl<'a> FromSql<'a> for Decimal {
fn from_sql(
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
let mut rdr = Cursor::new(raw);
let _n_digits = rdr.read_u16::<BigEndian>()?;
let _weight = rdr.read_i16::<BigEndian>()?;
let sign = rdr.read_u16::<BigEndian>()?;
match sign {
0xC000 => Ok(Self::NaN),
0xD000 => Ok(Self::PositiveInf),
0xF000 => Ok(Self::NegativeInf),
_ => RustDecimal::from_sql(ty, raw).map(Self::Normalized),
}
}

fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
}

macro_rules! impl_convert_int {
($T:ty) => {
impl core::convert::From<$T> for Decimal {
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp,
Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;
use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type};

Expand Down Expand Up @@ -114,7 +113,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
handle_data_type!(row, i, name, f64)
}
DataType::Decimal => {
handle_data_type!(row, i, name, RustDecimal, Decimal)
handle_data_type!(row, i, name, Decimal)
}
DataType::Int256 => {
// Currently in order to handle the decimal beyond RustDecimal,
Expand Down Expand Up @@ -246,7 +245,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
handle_list_data_type!(row, i, name, f64, builder);
}
DataType::Decimal => {
handle_list_data_type!(row, i, name, RustDecimal, builder, Decimal);
handle_list_data_type!(row, i, name, Decimal, builder);
}
DataType::Date => {
handle_list_data_type!(row, i, name, NaiveDate, builder, Date);
Expand Down Expand Up @@ -415,7 +414,8 @@ fn pg_numeric_to_varchar(val: Option<PgNumeric>) -> Option<ScalarImpl> {

fn pg_numeric_to_string(val: Option<PgNumeric>) -> Option<String> {
if let Some(pg_numeric) = val {
// TODO(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN
// FIXME(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN
// https://github.com/risingwavelabs/risingwave/issues/16395
// The current implementation is to ensure consistency with the behavior of cdc event parsor.
match pg_numeric {
PgNumeric::NegativeInf => Some(String::from("NEGATIVE_INFINITY")),
Expand Down
14 changes: 10 additions & 4 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,16 @@ impl JsonParseOptions {
.map_err(|_| create_error())?
.into()
}

(Some(DataType::Decimal), ValueType::String) => ScalarImpl::Decimal(
Decimal::from_str(value.as_str().unwrap()).map_err(|_err| create_error())?,
),
(Some(DataType::Decimal), ValueType::String) => {
let str = value.as_str().unwrap();
// the following values are special string generated by Debezium and should be handled separately
match str {
"NAN" => ScalarImpl::Decimal(Decimal::NaN),
"POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf),
"NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf),
_ => ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?),
}
}
(Some(DataType::Decimal), ValueType::Object) => {
// ref https://github.com/risingwavelabs/risingwave/issues/10628
// handle debezium json (variable scale): {"scale": int, "value": bytes}
Expand Down

0 comments on commit 9f4be46

Please sign in to comment.