Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(types): simplify ArrayImpl::from_protobuf #17763

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod stream_chunk_iter;
pub mod stream_record;
pub mod struct_array;
mod utf8_array;
mod value_reader;

use std::convert::From;
use std::hash::{Hash, Hasher};
Expand Down
16 changes: 14 additions & 2 deletions src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

use std::fmt::Debug;
use std::io::Write;
use std::io::{Cursor, Write};
use std::mem::size_of;

use anyhow::Context;
use byteorder::{BigEndian, ReadBytesExt};
use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize};
use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::Buffer;
Expand Down Expand Up @@ -50,6 +52,7 @@ where

// item methods
fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize>;
fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self>;
}

macro_rules! impl_array_methods {
Expand Down Expand Up @@ -81,14 +84,20 @@ macro_rules! impl_array_methods {
}

macro_rules! impl_primitive_for_native_types {
($({ $naive_type:ty, $scalar_type:ident } ),*) => {
($({ $naive_type:ty, $scalar_type:ident, $read_fn:ident } ),*) => {
$(
impl PrimitiveArrayItemType for $naive_type {
impl_array_methods!($naive_type, $scalar_type, $scalar_type);

fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
NativeType::to_protobuf(self, output)
}
fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self> {
let v = cur
.$read_fn::<BigEndian>()
.context("failed to read value from buffer")?;
Ok(v.into())
}
}
)*
}
Expand All @@ -106,6 +115,9 @@ macro_rules! impl_primitive_for_others {
fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
<$scalar_type>::to_protobuf(self, output)
}
fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self> {
<$scalar_type>::from_protobuf(cur)
}
}
)*
}
Expand Down
154 changes: 48 additions & 106 deletions src/common/src/array/proto_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,32 @@ use std::io::{Cursor, Read};

use anyhow::Context;
use byteorder::{BigEndian, ReadBytesExt};
use paste::paste;
use risingwave_pb::data::PbArrayType;

use super::*;
use crate::array::value_reader::{PrimitiveValueReader, VarSizedValueReader};

impl ArrayImpl {
pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult<Self> {
use crate::array::value_reader::*;
let array = match array.array_type() {
PbArrayType::Unspecified => unreachable!(),
PbArrayType::Int16 => read_numeric_array::<i16, I16ValueReader>(array, cardinality)?,
PbArrayType::Int32 => read_numeric_array::<i32, I32ValueReader>(array, cardinality)?,
PbArrayType::Int64 => read_numeric_array::<i64, I64ValueReader>(array, cardinality)?,
PbArrayType::Serial => {
read_numeric_array::<Serial, SerialValueReader>(array, cardinality)?
}
PbArrayType::Float32 => read_numeric_array::<F32, F32ValueReader>(array, cardinality)?,
PbArrayType::Float64 => read_numeric_array::<F64, F64ValueReader>(array, cardinality)?,
PbArrayType::Int16 => read_primitive_array::<i16>(array, cardinality)?,
PbArrayType::Int32 => read_primitive_array::<i32>(array, cardinality)?,
PbArrayType::Int64 => read_primitive_array::<i64>(array, cardinality)?,
PbArrayType::Serial => read_primitive_array::<Serial>(array, cardinality)?,
PbArrayType::Float32 => read_primitive_array::<F32>(array, cardinality)?,
PbArrayType::Float64 => read_primitive_array::<F64>(array, cardinality)?,
PbArrayType::Bool => read_bool_array(array, cardinality)?,
PbArrayType::Utf8 => {
read_string_array::<Utf8ArrayBuilder, Utf8ValueReader>(array, cardinality)?
}
PbArrayType::Decimal => {
read_numeric_array::<Decimal, DecimalValueReader>(array, cardinality)?
}
PbArrayType::Date => read_date_array(array, cardinality)?,
PbArrayType::Time => read_time_array(array, cardinality)?,
PbArrayType::Timestamp => read_timestamp_array(array, cardinality)?,
PbArrayType::Timestamptz => read_timestamptz_array(array, cardinality)?,
PbArrayType::Interval => read_interval_array(array, cardinality)?,
PbArrayType::Utf8 => read_string_array::<Utf8ValueReader>(array, cardinality)?,
PbArrayType::Decimal => read_primitive_array::<Decimal>(array, cardinality)?,
PbArrayType::Date => read_primitive_array::<Date>(array, cardinality)?,
PbArrayType::Time => read_primitive_array::<Time>(array, cardinality)?,
PbArrayType::Timestamp => read_primitive_array::<Timestamp>(array, cardinality)?,
PbArrayType::Timestamptz => read_primitive_array::<Timestamptz>(array, cardinality)?,
PbArrayType::Interval => read_primitive_array::<Interval>(array, cardinality)?,
PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
PbArrayType::Struct => StructArray::from_protobuf(array)?,
PbArrayType::List => ListArray::from_protobuf(array)?,
PbArrayType::Bytea => {
read_string_array::<BytesArrayBuilder, BytesValueReader>(array, cardinality)?
}
PbArrayType::Bytea => read_string_array::<BytesValueReader>(array, cardinality)?,
PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
};
Ok(array)
Expand All @@ -62,7 +51,7 @@ impl ArrayImpl {
// TODO: Use techniques like apache arrow flight RPC to eliminate deserialization.
// https://arrow.apache.org/docs/format/Flight.html

fn read_numeric_array<T: PrimitiveArrayItemType, R: PrimitiveValueReader<T>>(
fn read_primitive_array<T: PrimitiveArrayItemType>(
array: &PbArray,
cardinality: usize,
) -> ArrayResult<ArrayImpl> {
Expand All @@ -78,7 +67,7 @@ fn read_numeric_array<T: PrimitiveArrayItemType, R: PrimitiveValueReader<T>>(
let mut cursor = Cursor::new(buf);
for not_null in bitmap.iter() {
if not_null {
let v = R::read(&mut cursor)?;
let v = T::from_protobuf(&mut cursor)?;
builder.append(Some(v));
} else {
builder.append(None);
Expand All @@ -105,99 +94,52 @@ fn read_bool_array(array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl
Ok(arr.into())
}

fn read_date(cursor: &mut Cursor<&[u8]>) -> ArrayResult<Date> {
let days = cursor
.read_i32::<BigEndian>()
.context("failed to read i32 from Date buffer")?;

Ok(Date::with_days(days)?)
}

fn read_time(cursor: &mut Cursor<&[u8]>) -> ArrayResult<Time> {
let nano = cursor
.read_u64::<BigEndian>()
.context("failed to read u64 from Time buffer")?;

Ok(Time::with_nano(nano)?)
}

fn read_timestamp(cursor: &mut Cursor<&[u8]>) -> ArrayResult<Timestamp> {
let micros = cursor
fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult<i64> {
let offset = offset_cursor
.read_i64::<BigEndian>()
.context("failed to read i64 from Timestamp buffer")?;
.context("failed to read i64 from offset buffer")?;

Ok(Timestamp::with_micros(micros)?)
Ok(offset)
}

fn read_timestamptz(cursor: &mut Cursor<&[u8]>) -> ArrayResult<Timestamptz> {
let micros = cursor
.read_i64::<BigEndian>()
.context("failed to read i64 from Timestamptz buffer")?;

Timestamptz::from_protobuf(micros)
trait VarSizedValueReader {
type AB: ArrayBuilder;
fn new_builder(capacity: usize) -> Self::AB;
fn read(buf: &[u8], builder: &mut Self::AB) -> ArrayResult<()>;
}

fn read_interval(cursor: &mut Cursor<&[u8]>) -> ArrayResult<Interval> {
let mut read = || {
let months = cursor.read_i32::<BigEndian>()?;
let days = cursor.read_i32::<BigEndian>()?;
let usecs = cursor.read_i64::<BigEndian>()?;
struct Utf8ValueReader;

Ok::<_, std::io::Error>(Interval::from_month_day_usec(months, days, usecs))
};
impl VarSizedValueReader for Utf8ValueReader {
type AB = Utf8ArrayBuilder;

Ok(read().context("failed to read Interval from buffer")?)
}

macro_rules! read_one_value_array {
($({ $type:ident, $builder:ty }),*) => {
paste! {
$(
fn [<read_ $type:snake _array>](array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl> {
ensure!(
array.get_values().len() == 1,
"Must have only 1 buffer in a {} array", stringify!($type)
);
fn new_builder(capacity: usize) -> Self::AB {
Utf8ArrayBuilder::new(capacity)
}

let buf = array.get_values()[0].get_body().as_slice();

let mut builder = $builder::new(cardinality);
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let mut cursor = Cursor::new(buf);
for not_null in bitmap.iter() {
if not_null {
builder.append(Some([<read_ $type:snake>](&mut cursor)?));
} else {
builder.append(None);
}
}
let arr = builder.finish();
ensure_eq!(arr.len(), cardinality);

Ok(arr.into())
}
)*
}
};
fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> {
let s = std::str::from_utf8(buf).context("failed to read utf8 string from bytes")?;
builder.append(Some(s));
Ok(())
}
}

read_one_value_array! {
{ Interval, IntervalArrayBuilder },
{ Date, DateArrayBuilder },
{ Time, TimeArrayBuilder },
{ Timestamp, TimestampArrayBuilder },
{ Timestamptz, TimestamptzArrayBuilder }
}
struct BytesValueReader;

fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult<i64> {
let offset = offset_cursor
.read_i64::<BigEndian>()
.context("failed to read i64 from offset buffer")?;
impl VarSizedValueReader for BytesValueReader {
type AB = BytesArrayBuilder;

Ok(offset)
fn new_builder(capacity: usize) -> Self::AB {
BytesArrayBuilder::new(capacity)
}

fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> {
builder.append(Some(buf));
Ok(())
}
}

fn read_string_array<B: ArrayBuilder, R: VarSizedValueReader<B>>(
fn read_string_array<R: VarSizedValueReader>(
array: &PbArray,
cardinality: usize,
) -> ArrayResult<ArrayImpl> {
Expand All @@ -208,7 +150,7 @@ fn read_string_array<B: ArrayBuilder, R: VarSizedValueReader<B>>(
let offset_buff = array.get_values()[0].get_body().as_slice();
let data_buf = array.get_values()[1].get_body().as_slice();

let mut builder = B::new(cardinality);
let mut builder = R::new_builder(cardinality);
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let mut offset_cursor = Cursor::new(offset_buff);
let mut data_cursor = Cursor::new(data_buf);
Expand Down
88 changes: 0 additions & 88 deletions src/common/src/array/value_reader.rs

This file was deleted.

Loading
Loading