Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Mar 26, 2024
1 parent 5bf0f5d commit e71fcd7
Showing 1 changed file with 48 additions and 37 deletions.
85 changes: 48 additions & 37 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ pub trait ToArrowArrayWithTypeConvert {
ArrayImpl::Serial(array) => self.serial_to_arrow(data_type, array),
}
}

#[inline]
fn int16_to_arrow(
&self,
Expand All @@ -161,6 +162,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Int16Array::from(array)))
}

#[inline]
fn int32_to_arrow(
&self,
Expand All @@ -169,6 +171,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Int32Array::from(array)))
}

#[inline]
fn int64_to_arrow(
&self,
Expand All @@ -177,6 +180,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Int64Array::from(array)))
}

#[inline]
fn float32_to_arrow(
&self,
Expand All @@ -185,6 +189,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Float32Array::from(array)))
}

#[inline]
fn float64_to_arrow(
&self,
Expand All @@ -193,6 +198,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Float64Array::from(array)))
}

#[inline]
fn utf8_to_arrow(
&self,
Expand All @@ -201,6 +207,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::StringArray::from(array)))
}

#[inline]
fn bool_to_arrow(
&self,
Expand All @@ -209,6 +216,8 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::BooleanArray::from(array)))
}

// Decimal values are stored as ASCII text representation in a large binary array.
#[inline]
fn decimal_to_arrow(
&self,
Expand All @@ -217,6 +226,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::LargeBinaryArray::from(array)))
}

#[inline]
fn int256_to_arrow(
&self,
Expand All @@ -225,6 +235,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
}

#[inline]
fn date_to_arrow(
&self,
Expand All @@ -233,6 +244,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Date32Array::from(array)))
}

#[inline]
fn timestamp_to_arrow(
&self,
Expand All @@ -243,6 +255,7 @@ pub trait ToArrowArrayWithTypeConvert {
array,
)))
}

#[inline]
fn timestamptz_to_arrow(
&self,
Expand All @@ -253,6 +266,7 @@ pub trait ToArrowArrayWithTypeConvert {
arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
))
}

#[inline]
fn time_to_arrow(
&self,
Expand All @@ -261,6 +275,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
}

#[inline]
fn interval_to_arrow(
&self,
Expand All @@ -271,6 +286,7 @@ pub trait ToArrowArrayWithTypeConvert {
array,
)))
}

#[inline]
fn struct_to_arrow(
&self,
Expand All @@ -279,6 +295,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::StructArray::try_from(array)?))
}

#[inline]
fn list_to_arrow(
&self,
Expand All @@ -287,6 +304,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::ListArray::try_from(array)?))
}

#[inline]
fn bytea_to_arrow(
&self,
Expand All @@ -295,6 +313,8 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::BinaryArray::from(array)))
}

// JSON values are stored as text representation in a large string array.
#[inline]
fn jsonb_to_arrow(
&self,
Expand All @@ -303,6 +323,7 @@ pub trait ToArrowArrayWithTypeConvert {
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::LargeStringArray::from(array)))
}

#[inline]
fn serial_to_arrow(
&self,
Expand Down Expand Up @@ -338,46 +359,58 @@ pub trait ToArrowArrayConvert {
ArrayImpl::Serial(array) => self.serial_to_arrow(array),
}
}

#[inline]
fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Int16Array::from(array)))
}

#[inline]
fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Int32Array::from(array)))
}

#[inline]
fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Int64Array::from(array)))
}

#[inline]
fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Float32Array::from(array)))
}

#[inline]
fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Float64Array::from(array)))
}

#[inline]
fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::StringArray::from(array)))
}

#[inline]
fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::BooleanArray::from(array)))
}

// Decimal values are stored as ASCII text representation in a large binary array.
#[inline]
fn decimal_to_arrow(&self, array: &DecimalArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::LargeBinaryArray::from(array)))
}

#[inline]
fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
}

#[inline]
fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Date32Array::from(array)))
}

#[inline]
fn timestamp_to_arrow(
&self,
Expand All @@ -387,6 +420,7 @@ pub trait ToArrowArrayConvert {
array,
)))
}

#[inline]
fn timestamptz_to_arrow(
&self,
Expand All @@ -396,10 +430,12 @@ pub trait ToArrowArrayConvert {
arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
))
}

#[inline]
fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
}

#[inline]
fn interval_to_arrow(
&self,
Expand All @@ -409,22 +445,28 @@ pub trait ToArrowArrayConvert {
array,
)))
}

#[inline]
fn struct_to_arrow(&self, array: &StructArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::StructArray::try_from(array)?))
}

#[inline]
fn list_to_arrow(&self, array: &ListArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::ListArray::try_from(array)?))
}

#[inline]
fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::BinaryArray::from(array)))
}

// JSON values are stored as text representation in a large string array.
#[inline]
fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::LargeStringArray::from(array)))
}

#[inline]
fn serial_to_arrow(&self, _array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
todo!("serial type is not supported to convert to arrow")
Expand Down Expand Up @@ -572,7 +614,6 @@ pub trait ToArrowTypeConvert {

struct DefaultArrowConvert;
impl ToArrowArrayConvert for DefaultArrowConvert {}
static DEFAULT_ARROW_CONVERT: DefaultArrowConvert = DefaultArrowConvert;

/// Implement bi-directional `From` between `ArrayImpl` and `arrow_array::ArrayRef`.
macro_rules! converts_generic {
Expand All @@ -581,7 +622,7 @@ macro_rules! converts_generic {
impl TryFrom<&ArrayImpl> for arrow_array::ArrayRef {
type Error = ArrayError;
fn try_from(array: &ArrayImpl) -> Result<Self, Self::Error> {
DEFAULT_ARROW_CONVERT.to_arrow(array)
DefaultArrowConvert{}.to_arrow(array)
}
}
// Arrow array -> RisingWave array
Expand Down Expand Up @@ -702,45 +743,15 @@ impl From<arrow_schema::DataType> for DataType {
}
}

struct DefaultArrowTypeConvert;

impl ToArrowTypeConvert for DefaultArrowTypeConvert {}

impl TryFrom<&DataType> for arrow_schema::DataType {
type Error = ArrayError;

fn try_from(value: &DataType) -> Result<Self, Self::Error> {
match value {
DataType::Boolean => Ok(Self::Boolean),
DataType::Int16 => Ok(Self::Int16),
DataType::Int32 => Ok(Self::Int32),
DataType::Int64 => Ok(Self::Int64),
DataType::Int256 => Ok(Self::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)),
DataType::Float32 => Ok(Self::Float32),
DataType::Float64 => Ok(Self::Float64),
DataType::Date => Ok(Self::Date32),
DataType::Timestamp => Ok(Self::Timestamp(arrow_schema::TimeUnit::Microsecond, None)),
DataType::Timestamptz => Ok(Self::Timestamp(
arrow_schema::TimeUnit::Microsecond,
Some("+00:00".into()),
)),
DataType::Time => Ok(Self::Time64(arrow_schema::TimeUnit::Microsecond)),
DataType::Interval => Ok(Self::Interval(arrow_schema::IntervalUnit::MonthDayNano)),
DataType::Varchar => Ok(Self::Utf8),
DataType::Jsonb => Ok(Self::LargeUtf8),
DataType::Bytea => Ok(Self::Binary),
DataType::Decimal => Ok(Self::LargeBinary),
DataType::Struct(struct_type) => Ok(Self::Struct(
struct_type
.iter()
.map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true)))
.try_collect::<_, _, ArrayError>()?,
)),
DataType::List(datatype) => Ok(Self::List(Arc::new(arrow_schema::Field::new(
"item",
datatype.as_ref().try_into()?,
true,
)))),
DataType::Serial => Err(ArrayError::to_arrow(
"Serial type is not supported to convert to arrow",
)),
}
DefaultArrowTypeConvert {}.to_arrow_type(value)
}
}

Expand Down

0 comments on commit e71fcd7

Please sign in to comment.