Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(types): doc & refine ToBinary/Text #17697

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 1 addition & 41 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ use std::hash::Hash;
use std::io::Write;
use std::str::FromStr;

use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use chrono::{
DateTime, Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday,
};
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::ZeroHeapSize;
use thiserror::Error;

use super::to_binary::ToBinary;
use super::to_text::ToText;
use super::{CheckedAdd, DataType, Interval};
use crate::array::{ArrayError, ArrayResult};
Expand Down Expand Up @@ -427,45 +426,6 @@ impl ToText for Timestamp {
}
}

impl ToBinary for Date {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
super::DataType::Date => {
let mut output = BytesMut::new();
self.0.to_sql(&Type::ANY, &mut output).unwrap();
Ok(Some(output.freeze()))
}
_ => unreachable!(),
}
}
}

impl ToBinary for Time {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
super::DataType::Time => {
let mut output = BytesMut::new();
self.0.to_sql(&Type::ANY, &mut output).unwrap();
Ok(Some(output.freeze()))
}
_ => unreachable!(),
}
}
}

impl ToBinary for Timestamp {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
super::DataType::Timestamp => {
let mut output = BytesMut::new();
self.0.to_sql(&Type::ANY, &mut output).unwrap();
Ok(Some(output.freeze()))
}
_ => unreachable!(),
}
}
}

impl Date {
pub fn with_days(days: i32) -> Result<Self> {
Ok(Date::new(
Expand Down
16 changes: 1 addition & 15 deletions src/common/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::io::{Cursor, Read, Write};
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};

use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{BufMut, BytesMut};
use num_traits::{
CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero,
};
Expand All @@ -26,7 +26,6 @@ use risingwave_common_estimate_size::ZeroHeapSize;
use rust_decimal::prelude::FromStr;
use rust_decimal::{Decimal as RustDecimal, Error, MathematicalOps as _, RoundingStrategy};

use super::to_binary::ToBinary;
use super::to_text::ToText;
use super::DataType;
use crate::array::ArrayResult;
Expand Down Expand Up @@ -90,19 +89,6 @@ impl Decimal {
}
}

impl ToBinary for Decimal {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
DataType::Decimal => {
let mut output = BytesMut::new();
self.to_sql(&Type::NUMERIC, &mut output).unwrap();
Ok(Some(output.freeze()))
}
_ => unreachable!(),
}
}
}

impl ToSql for Decimal {
accepts!(NUMERIC);

Expand Down
13 changes: 0 additions & 13 deletions src/common/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,19 +1181,6 @@ impl<'a> FromSql<'a> for Interval {
}
}

impl ToBinary for Interval {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
DataType::Interval => {
let mut output = BytesMut::new();
self.to_sql(&Type::ANY, &mut output).unwrap();
Ok(Some(output.freeze()))
}
_ => unreachable!(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DateTimeField {
Year,
Expand Down
22 changes: 17 additions & 5 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub use self::serial::Serial;
pub use self::struct_type::StructType;
pub use self::successor::Successor;
pub use self::timestamptz::*;
pub use self::to_binary::ToBinary;
pub use self::to_text::ToText;
pub use self::with_data_type::WithDataType;

Expand Down Expand Up @@ -505,6 +504,10 @@ macro_rules! scalar_impl_enum {
($( { $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty } ),*) => {
/// `ScalarImpl` embeds all possible scalars in the evaluation framework.
///
/// Note: `ScalarImpl` doesn't contain all information of its `DataType`,
/// so sometimes they need to be used together.
/// e.g., for `Struct`, we don't have the field names in the value.
///
/// See `for_all_variants` for the definition.
#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
pub enum ScalarImpl {
Expand All @@ -513,6 +516,12 @@ macro_rules! scalar_impl_enum {

/// `ScalarRefImpl` embeds all possible scalar references in the evaluation
/// framework.
///
/// Note: `ScalarRefImpl` doesn't contain all information of its `DataType`,
/// so sometimes they need to be used together.
/// e.g., for `Struct`, we don't have the field names in the value.
///
/// See `for_all_variants` for the definition.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ScalarRefImpl<'scalar> {
$( $variant_name($scalar_ref) ),*
Expand Down Expand Up @@ -791,7 +800,9 @@ impl From<Bytes> for ScalarImpl {
}

impl ScalarImpl {
/// Creates a scalar from binary.
/// Creates a scalar from pgwire "BINARY" format.
///
/// The counterpart of [`to_binary::ToBinary`].
pub fn from_binary(bytes: &Bytes, data_type: &DataType) -> Result<Self, BoxedError> {
let res = match data_type {
DataType::Varchar => Self::Utf8(String::from_sql(&Type::VARCHAR, bytes)?.into()),
Expand Down Expand Up @@ -827,7 +838,9 @@ impl ScalarImpl {
Ok(res)
}

/// Creates a scalar from text.
/// Creates a scalar from pgwire "TEXT" format.
///
/// The counterpart of [`ToText`].
pub fn from_text(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
Ok(match data_type {
DataType::Boolean => str_to_bool(s)?.into(),
Expand Down Expand Up @@ -908,9 +921,8 @@ pub fn hash_datum(datum: impl ToDatumRef, state: &mut impl std::hash::Hasher) {
}

impl ScalarRefImpl<'_> {
/// Encode the scalar to postgresql binary format.
/// The encoder implements encoding using <https://docs.rs/postgres-types/0.2.3/postgres_types/trait.ToSql.html>
pub fn binary_format(&self, data_type: &DataType) -> to_binary::Result<Bytes> {
use self::to_binary::ToBinary;
self.to_binary_with_type(data_type).transpose().unwrap()
}

Expand Down
13 changes: 1 addition & 12 deletions src/common/src/types/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::util::row_id::RowId;

// Serial is an alias for i64
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
pub struct Serial(i64);
pub struct Serial(pub(crate) i64);

impl From<Serial> for i64 {
fn from(value: Serial) -> i64 {
Expand Down Expand Up @@ -75,17 +75,6 @@ impl crate::types::to_text::ToText for Serial {
}
}

impl crate::types::to_binary::ToBinary for Serial {
fn to_binary_with_type(
&self,
_ty: &crate::types::DataType,
) -> super::to_binary::Result<Option<bytes::Bytes>> {
let mut output = bytes::BytesMut::new();
self.0.to_sql(&Type::ANY, &mut output).unwrap();
Ok(Some(output.freeze()))
}
}

impl ToSql for Serial {
accepts!(INT8);

Expand Down
15 changes: 1 addition & 14 deletions src/common/src/types/timestamptz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use std::error::Error;
use std::io::Write;
use std::str::FromStr;

use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use chrono::{DateTime, Datelike, TimeZone, Utc};
use chrono_tz::Tz;
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::ZeroHeapSize;
use serde::{Deserialize, Serialize};

use super::to_binary::ToBinary;
use super::to_text::ToText;
use super::DataType;
use crate::array::ArrayResult;
Expand Down Expand Up @@ -65,18 +64,6 @@ impl<'a> FromSql<'a> for Timestamptz {
}
}

impl ToBinary for Timestamptz {
fn to_binary_with_type(&self, _ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
let instant = self.to_datetime_utc();
let mut out = BytesMut::new();
// postgres_types::Type::ANY is only used as a placeholder.
instant
.to_sql(&postgres_types::Type::ANY, &mut out)
.unwrap();
Ok(Some(out.freeze()))
}
}

impl ToText for Timestamptz {
fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
// Just a meaningful representation as placeholder. The real implementation depends
Expand Down
32 changes: 17 additions & 15 deletions src/common/src/types/to_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use bytes::{Bytes, BytesMut};
use postgres_types::{ToSql, Type};

use super::{DataType, DatumRef, ScalarRefImpl, F32, F64};
use super::{
DataType, Date, Decimal, Interval, ScalarRefImpl, Serial, Time, Timestamp, Timestamptz, F32,
F64,
};
use crate::error::NotImplemented;

/// Error type for [`ToBinary`] trait.
Expand All @@ -30,14 +33,15 @@ pub enum ToBinaryError {

pub type Result<T> = std::result::Result<T, ToBinaryError>;

// Used to convert ScalarRef to text format
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

/// Converts `ScalarRef` to pgwire "BINARY" format.
///
/// [`postgres_types::ToSql`] has similar functionality, and most of our types implement
/// that trait and forward `ToBinary` to it directly.
pub trait ToBinary {
fn to_binary_with_type(&self, ty: &DataType) -> Result<Option<Bytes>>;
}

// implement use to_sql
macro_rules! implement_using_to_sql {
($({ $scalar_type:ty, $data_type:ident, $accessor:expr } ),*) => {
($({ $scalar_type:ty, $data_type:ident, $accessor:expr } ),* $(,)?) => {
$(
impl ToBinary for $scalar_type {
fn to_binary_with_type(&self, ty: &DataType) -> Result<Option<Bytes>> {
Expand All @@ -64,7 +68,14 @@ implement_using_to_sql! {
{ F32, Float32, |x: &F32| x.0 },
{ F64, Float64, |x: &F64| x.0 },
{ bool, Boolean, |x| x },
{ &[u8], Bytea, |x| x }
{ &[u8], Bytea, |x| x },
{ Time, Time, |x: &Time| x.0 },
{ Date, Date, |x: &Date| x.0 },
{ Timestamp, Timestamp, |x: &Timestamp| x.0 },
{ Decimal, Decimal, |x| x },
{ Interval, Interval, |x| x },
{ Serial, Serial, |x: &Serial| x.0 },
{ Timestamptz, Timestamptz, |x: &Timestamptz| x.to_datetime_utc() }
}

impl ToBinary for ScalarRefImpl<'_> {
Expand Down Expand Up @@ -94,12 +105,3 @@ impl ToBinary for ScalarRefImpl<'_> {
}
}
}

impl ToBinary for DatumRef<'_> {
fn to_binary_with_type(&self, ty: &DataType) -> Result<Option<Bytes>> {
match self {
Some(scalar) => scalar.to_binary_with_type(ty),
None => Ok(None),
}
}
}
43 changes: 22 additions & 21 deletions src/common/src/types/to_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,24 @@ use std::num::FpCategory;
use super::{DataType, DatumRef, ScalarRefImpl};
use crate::dispatch_scalar_ref_variants;

// Used to convert ScalarRef to text format
/// Converts `ScalarRef` to pgwire "TEXT" format.
///
/// ## Relationship with casting to varchar
///
/// For most types, this is also the implementation for casting to varchar, but there are exceptions.
/// e.g., The TEXT format for boolean is `t` / `f` while they cast to varchar `true` / `false`.
/// - <https://github.com/postgres/postgres/blob/REL_16_3/src/include/catalog/pg_cast.dat#L438-L439>
/// - <https://www.postgresql.org/docs/16/sql-createcast.html#:~:text=A%20small%20number%20of%20the%20built%2Din%20types%20do%20indeed%20have%20different%20behaviors%20for%20conversions%2C%20mostly%20because%20of%20requirements%20of%20the%20SQL%20standard>
///
/// ## Relationship with `ToString`/`Display`
///
/// For some types, the implementation diverge from Rust's standard `ToString`/`Display`,
/// to match PostgreSQL's representation.
///
/// ---
///
/// FIXME: `ToText` should depend on a lot of other stuff
/// but we have not implemented them yet: timezone, date style, interval style, bytea output, etc
pub trait ToText {
/// Write the text to the writer *regardless* of its data type
///
Expand All @@ -39,26 +56,10 @@ pub trait ToText {
/// text. E.g. for Int64, it will convert to text as a Int64 type.
/// We should prefer to use `to_text_with_type` because it's more clear and readable.
///
/// Following is the relationship between scalar and default type:
/// - `ScalarRefImpl::Int16` -> `DataType::Int16`
/// - `ScalarRefImpl::Int32` -> `DataType::Int32`
/// - `ScalarRefImpl::Int64` -> `DataType::Int64`
/// - `ScalarRefImpl::Int256` -> `DataType::Int256`
/// - `ScalarRefImpl::Float32` -> `DataType::Float32`
/// - `ScalarRefImpl::Float64` -> `DataType::Float64`
/// - `ScalarRefImpl::Decimal` -> `DataType::Decimal`
/// - `ScalarRefImpl::Bool` -> `DataType::Boolean`
/// - `ScalarRefImpl::Utf8` -> `DataType::Varchar`
/// - `ScalarRefImpl::Bytea` -> `DataType::Bytea`
/// - `ScalarRefImpl::Date` -> `DataType::Date`
/// - `ScalarRefImpl::Time` -> `DataType::Time`
/// - `ScalarRefImpl::Timestamp` -> `DataType::Timestamp`
/// - `ScalarRefImpl::Timestamptz` -> `DataType::Timestamptz`
/// - `ScalarRefImpl::Interval` -> `DataType::Interval`
/// - `ScalarRefImpl::Jsonb` -> `DataType::Jsonb`
/// - `ScalarRefImpl::List` -> `DataType::List`
/// - `ScalarRefImpl::Struct` -> `DataType::Struct`
/// - `ScalarRefImpl::Serial` -> `DataType::Serial`
/// Note: currently the `DataType` param is actually unnecessary.
/// Previously, Timestamptz is also represented as int64, and we need the data type to distinguish them.
/// Now we have 1-1 mapping, and it happens to be the case that PostgreSQL default `ToText` format does
/// not need additional metadata like field names contained in `DataType`.
fn to_text(&self) -> String {
let mut s = String::new();
self.write(&mut s).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ mod tests {
use mysql_async::Row as MySqlRow;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ToText};
use risingwave_common::types::DataType;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made during browsing all references of ToText. Don't think it's worth exposing it here.

use tokio_stream::StreamExt;

use crate::parser::mysql_row_to_owned_row;
Expand Down Expand Up @@ -187,7 +187,7 @@ mod tests {
let d = owned_row.datum_at(2);
if let Some(scalar) = d {
let v = scalar.into_timestamptz();
println!("timestamp: {}", v.to_text());
println!("timestamp: {:?}", v);
}
}
}
Expand Down
Loading
Loading