From 9d465e5842b5f19d02138caff8d8796d605f89ac Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 20 Jul 2024 10:54:06 +0800 Subject: [PATCH] refactor(types): simplify ArrayImpl::from_protobuf --- src/common/src/array/mod.rs | 1 - src/common/src/array/primitive_array.rs | 16 ++- src/common/src/array/proto_reader.rs | 154 ++++++++---------------- src/common/src/array/value_reader.rs | 88 -------------- src/common/src/types/datetime.rs | 28 ++++- src/common/src/types/interval.rs | 15 ++- src/common/src/types/mod.rs | 12 +- src/common/src/types/scalar_impl.rs | 2 +- src/common/src/types/timestamptz.rs | 11 +- 9 files changed, 118 insertions(+), 209 deletions(-) delete mode 100644 src/common/src/array/value_reader.rs diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index f2ae95aa71efb..89b3b06266786 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -35,7 +35,6 @@ mod stream_chunk_iter; pub mod stream_record; pub mod struct_array; mod utf8_array; -mod value_reader; use std::convert::From; use std::hash::{Hash, Hasher}; diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 9ae7912e9887e..45e1bc0ddd847 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -13,9 +13,11 @@ // limitations under the License. use std::fmt::Debug; -use std::io::Write; +use std::io::{Cursor, Write}; use std::mem::size_of; +use anyhow::Context; +use byteorder::{BigEndian, ReadBytesExt}; use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize}; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::Buffer; @@ -50,6 +52,7 @@ where // item methods fn to_protobuf(self, output: &mut T) -> ArrayResult; + fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult; } macro_rules! impl_array_methods { @@ -81,7 +84,7 @@ macro_rules! impl_array_methods { } macro_rules! impl_primitive_for_native_types { - ($({ $naive_type:ty, $scalar_type:ident } ),*) => { + ($({ $naive_type:ty, $scalar_type:ident, $read_fn:ident } ),*) => { $( impl PrimitiveArrayItemType for $naive_type { impl_array_methods!($naive_type, $scalar_type, $scalar_type); @@ -89,6 +92,12 @@ macro_rules! impl_primitive_for_native_types { fn to_protobuf(self, output: &mut T) -> ArrayResult { NativeType::to_protobuf(self, output) } + fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult { + let v = cur + .$read_fn::() + .context("failed to read value from buffer")?; + Ok(v.into()) + } } )* } @@ -106,6 +115,9 @@ macro_rules! impl_primitive_for_others { fn to_protobuf(self, output: &mut T) -> ArrayResult { <$scalar_type>::to_protobuf(self, output) } + fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult { + <$scalar_type>::from_protobuf(cur) + } } )* } diff --git a/src/common/src/array/proto_reader.rs b/src/common/src/array/proto_reader.rs index 7c3b05437770c..aa296900190df 100644 --- a/src/common/src/array/proto_reader.rs +++ b/src/common/src/array/proto_reader.rs @@ -16,43 +16,32 @@ use std::io::{Cursor, Read}; use anyhow::Context; use byteorder::{BigEndian, ReadBytesExt}; -use paste::paste; use risingwave_pb::data::PbArrayType; use super::*; -use crate::array::value_reader::{PrimitiveValueReader, VarSizedValueReader}; impl ArrayImpl { pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult { - use crate::array::value_reader::*; let array = match array.array_type() { PbArrayType::Unspecified => unreachable!(), - PbArrayType::Int16 => read_numeric_array::(array, cardinality)?, - PbArrayType::Int32 => read_numeric_array::(array, cardinality)?, - PbArrayType::Int64 => read_numeric_array::(array, cardinality)?, - PbArrayType::Serial => { - read_numeric_array::(array, cardinality)? - } - PbArrayType::Float32 => read_numeric_array::(array, cardinality)?, - PbArrayType::Float64 => read_numeric_array::(array, cardinality)?, + PbArrayType::Int16 => read_primitive_array::(array, cardinality)?, + PbArrayType::Int32 => read_primitive_array::(array, cardinality)?, + PbArrayType::Int64 => read_primitive_array::(array, cardinality)?, + PbArrayType::Serial => read_primitive_array::(array, cardinality)?, + PbArrayType::Float32 => read_primitive_array::(array, cardinality)?, + PbArrayType::Float64 => read_primitive_array::(array, cardinality)?, PbArrayType::Bool => read_bool_array(array, cardinality)?, - PbArrayType::Utf8 => { - read_string_array::(array, cardinality)? - } - PbArrayType::Decimal => { - read_numeric_array::(array, cardinality)? - } - PbArrayType::Date => read_date_array(array, cardinality)?, - PbArrayType::Time => read_time_array(array, cardinality)?, - PbArrayType::Timestamp => read_timestamp_array(array, cardinality)?, - PbArrayType::Timestamptz => read_timestamptz_array(array, cardinality)?, - PbArrayType::Interval => read_interval_array(array, cardinality)?, + PbArrayType::Utf8 => read_string_array::(array, cardinality)?, + PbArrayType::Decimal => read_primitive_array::(array, cardinality)?, + PbArrayType::Date => read_primitive_array::(array, cardinality)?, + PbArrayType::Time => read_primitive_array::