Skip to content

Commit

Permalink
small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed May 27, 2024
1 parent 0f357f5 commit 360a6af
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 32 deletions.
40 changes: 18 additions & 22 deletions src/common/src/types/from_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use postgres_types::{FromSql, Kind, Type};
use postgres_types::{FromSql, Type};
use risingwave_common::types::{
Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
Expand All @@ -22,27 +22,23 @@ impl<'a> FromSql<'a> for ScalarImpl {
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
Ok(match ty.kind() {
Kind::Simple => match *ty {
Type::BOOL => ScalarImpl::from(bool::from_sql(ty, raw)?),
Type::INT2 => ScalarImpl::from(i16::from_sql(ty, raw)?),
Type::INT4 => ScalarImpl::from(i32::from_sql(ty, raw)?),
Type::INT8 => ScalarImpl::from(i64::from_sql(ty, raw)?),
Type::FLOAT4 => ScalarImpl::from(f32::from_sql(ty, raw)?),
Type::FLOAT8 => ScalarImpl::from(f64::from_sql(ty, raw)?),
Type::DATE => ScalarImpl::from(Date::from_sql(ty, raw)?),
Type::TIME => ScalarImpl::from(Time::from_sql(ty, raw)?),
Type::TIMESTAMP => ScalarImpl::from(Timestamp::from_sql(ty, raw)?),
Type::TIMESTAMPTZ => ScalarImpl::from(Timestamptz::from_sql(ty, raw)?),
Type::JSONB => ScalarImpl::from(JsonbVal::from_sql(ty, raw)?),
Type::INTERVAL => ScalarImpl::from(Interval::from_sql(ty, raw)?),
Type::BYTEA => ScalarImpl::from(Vec::<u8>::from_sql(ty, raw)?.into_boxed_slice()),
Type::VARCHAR | Type::TEXT | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
ScalarImpl::from(String::from_sql(ty, raw)?)
}
// Serial, Int256, Struct, List and Decimal are not supported here
_ => bail_not_implemented!("the postgres decoding for {ty} is unsupported"),
},
Ok(match *ty {
Type::BOOL => ScalarImpl::from(bool::from_sql(ty, raw)?),
Type::INT2 => ScalarImpl::from(i16::from_sql(ty, raw)?),
Type::INT4 => ScalarImpl::from(i32::from_sql(ty, raw)?),
Type::INT8 => ScalarImpl::from(i64::from_sql(ty, raw)?),
Type::FLOAT4 => ScalarImpl::from(f32::from_sql(ty, raw)?),
Type::FLOAT8 => ScalarImpl::from(f64::from_sql(ty, raw)?),
Type::DATE => ScalarImpl::from(Date::from_sql(ty, raw)?),
Type::TIME => ScalarImpl::from(Time::from_sql(ty, raw)?),
Type::TIMESTAMP => ScalarImpl::from(Timestamp::from_sql(ty, raw)?),
Type::TIMESTAMPTZ => ScalarImpl::from(Timestamptz::from_sql(ty, raw)?),
Type::JSONB => ScalarImpl::from(JsonbVal::from_sql(ty, raw)?),
Type::INTERVAL => ScalarImpl::from(Interval::from_sql(ty, raw)?),
Type::BYTEA => ScalarImpl::from(Vec::<u8>::from_sql(ty, raw)?.into_boxed_slice()),
Type::VARCHAR | Type::TEXT => ScalarImpl::from(String::from_sql(ty, raw)?),
// Serial, Int256, Struct, List and Decimal are not supported here
// Note: The Decimal type is specially handled in the `ScalarAdapter`.
_ => bail_not_implemented!("the postgres decoding for {ty} is unsupported"),
})
}
Expand Down
20 changes: 12 additions & 8 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::fmt::{self, Write};
use std::hash::Hash;

use bytes::{Buf, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use jsonbb::{Value, ValueRef};
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::EstimateSize;
Expand Down Expand Up @@ -548,23 +548,27 @@ impl ToSql for JsonbVal {

fn to_sql(
&self,
ty: &Type,
_ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
serde_json::Value::from(self.0.clone()).to_sql(ty, out)
out.put_u8(1);
write!(out, "{}", self.0).unwrap();
Ok(IsNull::No)
}
}

impl<'a> FromSql<'a> for JsonbVal {
fn from_sql(
ty: &Type,
raw: &'a [u8],
_ty: &Type,
mut raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
let instant = serde_json::Value::from_sql(ty, raw)?;
Ok(JsonbVal::from(instant))
if raw.is_empty() || raw.get_u8() != 1 {
return Err("invalid jsonb encoding".into());
}
Ok(JsonbVal::from(Value::from_text(raw)?))
}

fn accepts(ty: &Type) -> bool {
Expand Down
14 changes: 12 additions & 2 deletions src/connector/src/parser/scalar_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub(crate) enum ScalarAdapter {
Enum(EnumString),
NumericList(Vec<Option<PgNumeric>>),
EnumList(Vec<Option<EnumString>>),
// UuidList is covered by List, while NumericList and EnumList are special cases.
// Note: The IntervalList is not supported.
List(Vec<Option<ScalarAdapter>>),
}

Expand Down Expand Up @@ -120,6 +122,8 @@ impl<'a> FromSql<'a> for ScalarAdapter {
match ty.kind() {
Kind::Simple => match *ty {
Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)),
// In order to cover the decimal beyond RustDecimal(only 28 digits are supported),
// we use the PgNumeric to handle decimal from postgres.
Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)),
_ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)),
},
Expand Down Expand Up @@ -185,7 +189,7 @@ impl ScalarAdapter {
Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)),
None => None,
_ => {
unreachable!("Currently, only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]");
unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]");
}
})
}
Expand Down Expand Up @@ -240,6 +244,8 @@ impl ScalarAdapter {
let mut builder = dtype.create_array_builder(0);
for val in vec {
let scalar = match (val, &dtype) {
// A numeric array contains special values like NaN, Inf, -Inf, which are not supported in Debezium,
// when we encounter these special values, we fallback the array to NULL, returning None directly.
(Some(numeric), box DataType::Varchar) => {
if pg_numeric_is_special(&numeric) {
return None;
Expand All @@ -251,6 +257,9 @@ impl ScalarAdapter {
if pg_numeric_is_special(&numeric) {
return None;
} else {
// A PgNumeric can sometimes exceeds the range of Int256 and RwNumeric.
// In our json parsing, we fallback the array to NULL in this case.
// Here we keep the behavior consistent and return None directly.
match ScalarAdapter::Numeric(numeric).into_scalar(dtype) {
Some(scalar) => Some(scalar),
None => {
Expand All @@ -260,8 +269,9 @@ impl ScalarAdapter {
}
}
(Some(_), _) => unreachable!(
"into_scalar_in_list should only be called with ScalarAdapter::Numeric types"
"Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"
),
// This item is NULL, continue to handle next item.
(None, _) => None,
};
builder.append(scalar);
Expand Down

0 comments on commit 360a6af

Please sign in to comment.