From 360a6af9410c125e71f63241abb8713c27de2723 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 27 May 2024 14:28:15 -0400 Subject: [PATCH] small refactor --- src/common/src/types/from_sql.rs | 40 ++++++++++------------ src/common/src/types/jsonb.rs | 20 ++++++----- src/connector/src/parser/scalar_adapter.rs | 14 ++++++-- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/src/common/src/types/from_sql.rs b/src/common/src/types/from_sql.rs index 7e03b8b4900cc..1dbbce3adf537 100644 --- a/src/common/src/types/from_sql.rs +++ b/src/common/src/types/from_sql.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use postgres_types::{FromSql, Kind, Type}; +use postgres_types::{FromSql, Type}; use risingwave_common::types::{ Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, }; @@ -22,27 +22,23 @@ impl<'a> FromSql<'a> for ScalarImpl { ty: &Type, raw: &'a [u8], ) -> Result> { - Ok(match ty.kind() { - Kind::Simple => match *ty { - Type::BOOL => ScalarImpl::from(bool::from_sql(ty, raw)?), - Type::INT2 => ScalarImpl::from(i16::from_sql(ty, raw)?), - Type::INT4 => ScalarImpl::from(i32::from_sql(ty, raw)?), - Type::INT8 => ScalarImpl::from(i64::from_sql(ty, raw)?), - Type::FLOAT4 => ScalarImpl::from(f32::from_sql(ty, raw)?), - Type::FLOAT8 => ScalarImpl::from(f64::from_sql(ty, raw)?), - Type::DATE => ScalarImpl::from(Date::from_sql(ty, raw)?), - Type::TIME => ScalarImpl::from(Time::from_sql(ty, raw)?), - Type::TIMESTAMP => ScalarImpl::from(Timestamp::from_sql(ty, raw)?), - Type::TIMESTAMPTZ => ScalarImpl::from(Timestamptz::from_sql(ty, raw)?), - Type::JSONB => ScalarImpl::from(JsonbVal::from_sql(ty, raw)?), - Type::INTERVAL => ScalarImpl::from(Interval::from_sql(ty, raw)?), - Type::BYTEA => ScalarImpl::from(Vec::::from_sql(ty, raw)?.into_boxed_slice()), - Type::VARCHAR | Type::TEXT | Type::BPCHAR | Type::NAME | Type::UNKNOWN => { - ScalarImpl::from(String::from_sql(ty, raw)?) - } - // Serial, Int256, Struct, List and Decimal are not supported here - _ => bail_not_implemented!("the postgres decoding for {ty} is unsupported"), - }, + Ok(match *ty { + Type::BOOL => ScalarImpl::from(bool::from_sql(ty, raw)?), + Type::INT2 => ScalarImpl::from(i16::from_sql(ty, raw)?), + Type::INT4 => ScalarImpl::from(i32::from_sql(ty, raw)?), + Type::INT8 => ScalarImpl::from(i64::from_sql(ty, raw)?), + Type::FLOAT4 => ScalarImpl::from(f32::from_sql(ty, raw)?), + Type::FLOAT8 => ScalarImpl::from(f64::from_sql(ty, raw)?), + Type::DATE => ScalarImpl::from(Date::from_sql(ty, raw)?), + Type::TIME => ScalarImpl::from(Time::from_sql(ty, raw)?), + Type::TIMESTAMP => ScalarImpl::from(Timestamp::from_sql(ty, raw)?), + Type::TIMESTAMPTZ => ScalarImpl::from(Timestamptz::from_sql(ty, raw)?), + Type::JSONB => ScalarImpl::from(JsonbVal::from_sql(ty, raw)?), + Type::INTERVAL => ScalarImpl::from(Interval::from_sql(ty, raw)?), + Type::BYTEA => ScalarImpl::from(Vec::::from_sql(ty, raw)?.into_boxed_slice()), + Type::VARCHAR | Type::TEXT => ScalarImpl::from(String::from_sql(ty, raw)?), + // Serial, Int256, Struct, List and Decimal are not supported here + // Note: The Decimal type is specially handled in the `ScalarAdapter`. _ => bail_not_implemented!("the postgres decoding for {ty} is unsupported"), }) } diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 0bee7d338fbc4..824020fac3123 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; +use std::fmt::{self, Write}; use std::hash::Hash; -use bytes::{Buf, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use jsonbb::{Value, ValueRef}; use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use risingwave_common_estimate_size::EstimateSize; @@ -548,23 +548,27 @@ impl ToSql for JsonbVal { fn to_sql( &self, - ty: &Type, + _ty: &Type, out: &mut BytesMut, ) -> Result> where Self: Sized, { - serde_json::Value::from(self.0.clone()).to_sql(ty, out) + out.put_u8(1); + write!(out, "{}", self.0).unwrap(); + Ok(IsNull::No) } } impl<'a> FromSql<'a> for JsonbVal { fn from_sql( - ty: &Type, - raw: &'a [u8], + _ty: &Type, + mut raw: &'a [u8], ) -> Result> { - let instant = serde_json::Value::from_sql(ty, raw)?; - Ok(JsonbVal::from(instant)) + if raw.is_empty() || raw.get_u8() != 1 { + return Err("invalid jsonb encoding".into()); + } + Ok(JsonbVal::from(Value::from_text(raw)?)) } fn accepts(ty: &Type) -> bool { diff --git a/src/connector/src/parser/scalar_adapter.rs b/src/connector/src/parser/scalar_adapter.rs index 539ba3a3013c0..0f5d2d6d6d935 100644 --- a/src/connector/src/parser/scalar_adapter.rs +++ b/src/connector/src/parser/scalar_adapter.rs @@ -84,6 +84,8 @@ pub(crate) enum ScalarAdapter { Enum(EnumString), NumericList(Vec>), EnumList(Vec>), + // UuidList is covered by List, while NumericList and EnumList are special cases. + // Note: The IntervalList is not supported. List(Vec>), } @@ -120,6 +122,8 @@ impl<'a> FromSql<'a> for ScalarAdapter { match ty.kind() { Kind::Simple => match *ty { Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)), + // In order to cover the decimal beyond RustDecimal(only 28 digits are supported), + // we use the PgNumeric to handle decimal from postgres. Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)), _ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)), }, @@ -185,7 +189,7 @@ impl ScalarAdapter { Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)), None => None, _ => { - unreachable!("Currently, only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"); + unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"); } }) } @@ -240,6 +244,8 @@ impl ScalarAdapter { let mut builder = dtype.create_array_builder(0); for val in vec { let scalar = match (val, &dtype) { + // A numeric array contains special values like NaN, Inf, -Inf, which are not supported in Debezium, + // when we encounter these special values, we fallback the array to NULL, returning None directly. (Some(numeric), box DataType::Varchar) => { if pg_numeric_is_special(&numeric) { return None; @@ -251,6 +257,9 @@ impl ScalarAdapter { if pg_numeric_is_special(&numeric) { return None; } else { + // A PgNumeric can sometimes exceeds the range of Int256 and RwNumeric. + // In our json parsing, we fallback the array to NULL in this case. + // Here we keep the behavior consistent and return None directly. match ScalarAdapter::Numeric(numeric).into_scalar(dtype) { Some(scalar) => Some(scalar), None => { @@ -260,8 +269,9 @@ impl ScalarAdapter { } } (Some(_), _) => unreachable!( - "into_scalar_in_list should only be called with ScalarAdapter::Numeric types" + "Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]" ), + // This item is NULL, continue to handle next item. (None, _) => None, }; builder.append(scalar);