diff --git a/src/batch/src/executor/aggregation/orderby.rs b/src/batch/src/executor/aggregation/orderby.rs index 427fa15688ca3..7f5cde312818e 100644 --- a/src/batch/src/executor/aggregation/orderby.rs +++ b/src/batch/src/executor/aggregation/orderby.rs @@ -172,15 +172,7 @@ mod tests { agg.update(&mut state, &chunk).await.unwrap(); assert_eq!( agg.get_result(&state).await.unwrap(), - Some( - ListValue::new(vec![ - Some(789.into()), - Some(456.into()), - Some(123.into()), - Some(321.into()), - ]) - .into() - ) + Some(ListValue::from_iter([789, 456, 123, 321]).into()) ); } diff --git a/src/batch/src/executor/filter.rs b/src/batch/src/executor/filter.rs index b9f98171e09dc..708082f024d48 100644 --- a/src/batch/src/executor/filter.rs +++ b/src/batch/src/executor/filter.rs @@ -133,17 +133,15 @@ mod tests { #[tokio::test] async fn test_list_filter_executor() { - use risingwave_common::array::{ArrayBuilder, ListArrayBuilder, ListRef, ListValue}; + use risingwave_common::array::{ArrayBuilder, ListArrayBuilder, ListValue}; use risingwave_common::types::Scalar; let mut builder = ListArrayBuilder::with_type(4, DataType::List(Box::new(DataType::Int32))); // Add 4 ListValues to ArrayBuilder - (1..=4).for_each(|i| { - builder.append(Some(ListRef::ValueRef { - val: &ListValue::new(vec![Some(i.to_scalar_value())]), - })); - }); + for i in 1..=4 { + builder.append(Some(ListValue::from_iter([i]).as_scalar_ref())); + } // Use builder to obtain a single (List) column DataChunk let chunk = DataChunk::new(vec![builder.finish().into_ref()], 4); @@ -181,15 +179,11 @@ mod tests { // Assert that values 3 and 4 are bigger than 2 assert_eq!( col1.value_at(0), - Some(ListRef::ValueRef { - val: &ListValue::new(vec![Some(3.to_scalar_value())]), - }) + Some(ListValue::from_iter([3]).as_scalar_ref()) ); assert_eq!( col1.value_at(1), - Some(ListRef::ValueRef { - val: &ListValue::new(vec![Some(4.to_scalar_value())]), - }) + Some(ListValue::from_iter([4]).as_scalar_ref()) ); } let res = stream.next().await; diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs index d78085b2ed17e..112247bbca127 100644 --- a/src/batch/src/executor/order_by.rs +++ b/src/batch/src/executor/order_by.rs @@ -613,17 +613,11 @@ mod tests { }, { list_builder.append(None); - list_builder.append(Some(ListRef::ValueRef { - val: &ListValue::new(vec![ - Some(1i64.to_scalar_value()), - None, - Some(3i64.to_scalar_value()), - ]), - })); + list_builder.append(Some( + ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(), + )); list_builder.append(None); - list_builder.append(Some(ListRef::ValueRef { - val: &ListValue::new(vec![Some(2i64.to_scalar_value())]), - })); + list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref())); list_builder.append(None); list_builder.finish().into_ref() }, @@ -675,16 +669,10 @@ mod tests { }, { list_builder.append(None); - list_builder.append(Some(ListRef::ValueRef { - val: &ListValue::new(vec![Some(2i64.to_scalar_value())]), - })); - list_builder.append(Some(ListRef::ValueRef { - val: &ListValue::new(vec![ - Some(1i64.to_scalar_value()), - None, - Some(3i64.to_scalar_value()), - ]), - })); + list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref())); + list_builder.append(Some( + ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(), + )); list_builder.append(None); list_builder.append(None); list_builder.finish().into_ref() diff --git a/src/common/benches/bench_encoding.rs b/src/common/benches/bench_encoding.rs index ffa4005cb8c32..9d091c279b239 100644 --- a/src/common/benches/bench_encoding.rs +++ b/src/common/benches/bench_encoding.rs @@ -124,7 +124,7 @@ fn bench_encoding(c: &mut Criterion) { Case::new( "List of Bool (len = 100)", DataType::List(Box::new(DataType::Boolean)), - ScalarImpl::List(ListValue::new(vec![Some(ScalarImpl::Bool(true)); 100])), + ScalarImpl::List(ListValue::from_iter([true; 100])), ), ]; diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 36336ee5f819b..b2b197532f202 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -674,11 +674,14 @@ impl TryFrom<&arrow_array::ListArray> for ListArray { type Error = ArrayError; fn try_from(array: &arrow_array::ListArray) -> Result { - let iter: Vec<_> = array - .iter() - .map(|o| o.map(|a| ArrayImpl::try_from(&a)).transpose()) - .try_collect()?; - Ok(ListArray::from_iter(iter, (&array.value_type()).into())) + Ok(ListArray { + value: Box::new(ArrayImpl::try_from(array.values())?), + bitmap: match array.nulls() { + Some(nulls) => nulls.iter().collect(), + None => Bitmap::ones(array.len()), + }, + offsets: array.offsets().iter().map(|o| *o as u32).collect(), + }) } } @@ -904,15 +907,7 @@ mod tests { #[test] fn list() { - let array = ListArray::from_iter( - [ - Some(I32Array::from_iter([None, Some(-7), Some(25)]).into()), - None, - Some(I32Array::from_iter([Some(0), Some(-127), Some(127), Some(50)]).into()), - Some(I32Array::from_iter([0; 0]).into()), - ], - DataType::Int32, - ); + let array = ListArray::from_iter([None, Some(vec![0, -127, 127, 50]), Some(vec![0; 0])]); let arrow = arrow_array::ListArray::from(&array); assert_eq!(ListArray::try_from(&arrow).unwrap(), array); } diff --git a/src/common/src/array/bool_array.rs b/src/common/src/array/bool_array.rs index fb12bb819fffd..2512b4c34c934 100644 --- a/src/common/src/array/bool_array.rs +++ b/src/common/src/array/bool_array.rs @@ -125,7 +125,7 @@ impl Array for BoolArray { } /// `BoolArrayBuilder` constructs a `BoolArray` from `Option`. -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct BoolArrayBuilder { bitmap: BitmapBuilder, data: BitmapBuilder, diff --git a/src/common/src/array/bytes_array.rs b/src/common/src/array/bytes_array.rs index bc0209f149af3..9aa3a1128244f 100644 --- a/src/common/src/array/bytes_array.rs +++ b/src/common/src/array/bytes_array.rs @@ -25,19 +25,11 @@ use crate::estimate_size::EstimateSize; use crate::util::iter_util::ZipEqDebug; /// `BytesArray` is a collection of Rust `[u8]`s. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)] pub struct BytesArray { - offset: Vec, + offset: Box<[u32]>, bitmap: Bitmap, - data: Vec, -} - -impl EstimateSize for BytesArray { - fn estimated_heap_size(&self) -> usize { - self.offset.capacity() * size_of::() - + self.bitmap.estimated_heap_size() - + self.data.capacity() - } + data: Box<[u8]>, } impl Array for BytesArray { @@ -86,7 +78,7 @@ impl Array for BytesArray { }, Buffer { compression: CompressionType::None as i32, - body: data_buffer, + body: data_buffer.into(), }, ]; let null_bitmap = self.null_bitmap().to_protobuf(); @@ -146,7 +138,7 @@ impl<'a> FromIterator<&'a [u8]> for BytesArray { } /// `BytesArrayBuilder` use `&[u8]` to build an `BytesArray`. -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct BytesArrayBuilder { offset: Vec, bitmap: BitmapBuilder, @@ -221,9 +213,9 @@ impl ArrayBuilder for BytesArrayBuilder { fn finish(self) -> BytesArray { BytesArray { - bitmap: (self.bitmap).finish(), - data: self.data, - offset: self.offset, + bitmap: self.bitmap.finish(), + data: self.data.into(), + offset: self.offset.into(), } } } diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 7cdeeeee4e2a6..b8cd84dbcc932 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -1114,7 +1114,7 @@ mod tests { #[test] fn test_chunk_estimated_size() { assert_eq!( - 96, + 72, DataChunk::from_pretty( "I I I 1 5 2 @@ -1124,7 +1124,7 @@ mod tests { .estimated_heap_size() ); assert_eq!( - 64, + 48, DataChunk::from_pretty( "I I 1 2 diff --git a/src/common/src/array/jsonb_array.rs b/src/common/src/array/jsonb_array.rs index 3c4ca23fff04e..1368bd46603b2 100644 --- a/src/common/src/array/jsonb_array.rs +++ b/src/common/src/array/jsonb_array.rs @@ -19,13 +19,13 @@ use crate::buffer::{Bitmap, BitmapBuilder}; use crate::estimate_size::EstimateSize; use crate::types::{DataType, JsonbRef, JsonbVal, Scalar}; -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct JsonbArrayBuilder { bitmap: BitmapBuilder, builder: jsonbb::Builder, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)] pub struct JsonbArray { bitmap: Bitmap, /// Elements are stored as a single JSONB array value. @@ -177,8 +177,14 @@ impl FromIterator for JsonbArray { } } -impl EstimateSize for JsonbArray { +impl EstimateSize for jsonbb::Value { fn estimated_heap_size(&self) -> usize { - self.bitmap.estimated_heap_size() + self.data.capacity() + self.capacity() + } +} + +impl EstimateSize for jsonbb::Builder { + fn estimated_heap_size(&self) -> usize { + self.capacity() } } diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index b411a092e445e..fcd1648f13b9f 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -14,20 +14,19 @@ use std::borrow::Cow; use std::cmp::Ordering; -use std::fmt; -use std::fmt::Debug; +use std::fmt::{self, Debug, Display}; use std::future::Future; -use std::hash::Hash; use std::mem::size_of; -use std::ops::{Index, IndexMut}; use bytes::{Buf, BufMut}; -use either::Either; use itertools::Itertools; use risingwave_pb::data::{ListArrayData, PbArray, PbArrayType}; use serde::{Deserialize, Serializer}; -use super::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayResult, RowRef}; +use super::{ + Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayResult, BoolArray, PrimitiveArray, + PrimitiveArrayItemType, RowRef, Utf8Array, +}; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::estimate_size::EstimateSize; use crate::row::Row; @@ -38,32 +37,11 @@ use crate::types::{ use crate::util::memcmp_encoding; use crate::util::value_encoding::estimate_serialize_datum_size; -macro_rules! iter_elems_ref { - ($self:expr, $it:ident, { $($body:tt)* }) => { - iter_elems_ref!($self, $it, { $($body)* }, { $($body)* }) - }; - - ($self:expr, $it:ident, { $($l_body:tt)* }, { $($r_body:tt)* }) => { - match $self { - ListRef::Indexed { arr, idx } => { - // SAFETY: `offsets` and `value` are both generated by the array builder, so they are always valid. - let $it = (arr.offsets[idx]..arr.offsets[idx + 1]).map(|o| unsafe { arr.value.value_at_unchecked(o as usize) }); - $($l_body)* - } - ListRef::ValueRef { val } => { - let $it = val.values.iter().map(ToDatumRef::to_datum_ref); - $($r_body)* - } - } - }; -} - -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct ListArrayBuilder { bitmap: BitmapBuilder, offsets: Vec, value: Box, - value_type: DataType, len: usize, } @@ -72,7 +50,7 @@ impl ArrayBuilder for ListArrayBuilder { #[cfg(not(test))] fn new(_capacity: usize) -> Self { - panic!("Must use with_type.") + panic!("please use `ListArrayBuilder::with_type` instead"); } #[cfg(test)] @@ -88,11 +66,12 @@ impl ArrayBuilder for ListArrayBuilder { let DataType::List(value_type) = ty else { panic!("data type must be DataType::List"); }; + let mut offsets = Vec::with_capacity(capacity + 1); + offsets.push(0); Self { bitmap: BitmapBuilder::with_capacity(capacity), - offsets: vec![0], + offsets, value: Box::new(value_type.create_array_builder(capacity)), - value_type: *value_type, len: 0, } } @@ -151,9 +130,8 @@ impl ArrayBuilder for ListArrayBuilder { fn finish(self) -> ListArray { ListArray { bitmap: self.bitmap.finish(), - offsets: self.offsets, + offsets: self.offsets.into(), value: Box::new(self.value.finish()), - value_type: self.value_type, } } } @@ -180,19 +158,18 @@ impl ListArrayBuilder { /// /// For example, `values (array[1]), (array[]::int[]), (null), (array[2, 3]);` stores an inner /// `I32Array` with `[1, 2, 3]`, along with offsets `[0, 1, 1, 1, 3]` and null bitmap `TTFT`. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ListArray { - bitmap: Bitmap, - pub(super) offsets: Vec, + pub(super) bitmap: Bitmap, + pub(super) offsets: Box<[u32]>, pub(super) value: Box, - pub(super) value_type: DataType, } impl EstimateSize for ListArray { fn estimated_heap_size(&self) -> usize { self.bitmap.estimated_heap_size() - + self.offsets.capacity() * size_of::() - + self.value.estimated_heap_size() + + self.offsets.len() * size_of::() + + self.value.estimated_size() } } @@ -202,7 +179,11 @@ impl Array for ListArray { type RefItem<'a> = ListRef<'a>; unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { - ListRef::Indexed { arr: self, idx } + ListRef { + array: &self.value, + start: *self.offsets.get_unchecked(idx), + end: *self.offsets.get_unchecked(idx + 1), + } } fn len(&self) -> usize { @@ -215,9 +196,9 @@ impl Array for ListArray { array_type: PbArrayType::List as i32, struct_array_data: None, list_array_data: Some(Box::new(ListArrayData { - offsets: self.offsets.clone(), + offsets: self.offsets.to_vec(), value: Some(Box::new(value)), - value_type: Some(self.value_type.to_protobuf()), + value_type: Some(self.value.data_type().to_protobuf()), })), null_bitmap: Some(self.bitmap.to_protobuf()), values: vec![], @@ -237,19 +218,24 @@ impl Array for ListArray { } fn data_type(&self) -> DataType { - DataType::List(Box::new(self.value_type.clone())) + DataType::List(Box::new(self.value.data_type())) } } impl ListArray { - /// Returns the total number of elements in the flattened array. - pub fn flatten_len(&self) -> usize { - self.value.len() - } - /// Flatten the list array into a single array. + /// + /// # Example + /// + /// ```text + /// [[1,2,3],NULL,[4,5]] => [1,2,3,4,5] + /// [[[1],[2]],[[3],[4]]] => [1,2,3,4] + /// ``` pub fn flatten(&self) -> ArrayImpl { - (*self.value).clone() + match &*self.value { + ArrayImpl::List(inner) => inner.flatten(), + a => a.clone(), + } } pub fn from_protobuf(array: &PbArray) -> ArrayResult { @@ -266,9 +252,8 @@ impl ListArray { let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), flatten_len)?; let arr = ListArray { bitmap, - offsets: array_data.offsets, + offsets: array_data.offsets.into(), value: Box::new(value), - value_type: DataType::from(&array_data.value_type.unwrap()), }; Ok(arr.into()) } @@ -280,151 +265,127 @@ impl ListArray { F: FnOnce(ArrayImpl) -> Fut, Fut: Future>, { - let Self { - bitmap, - offsets, - value, - .. - } = self; - - let new_value = (f)(*value).await?; - let new_value_type = new_value.data_type(); + let new_value = (f)(*self.value).await?; Ok(Self { - offsets, - bitmap, + offsets: self.offsets, + bitmap: self.bitmap, value: Box::new(new_value), - value_type: new_value_type, }) } +} - // Used for testing purposes - pub fn from_iter( - values: impl IntoIterator>, - value_type: DataType, - ) -> ListArray { - let values = values.into_iter(); - let size_hint = values.size_hint().0; - - let mut offsets = vec![0u32]; - offsets.reserve(size_hint); - let mut builder = ArrayBuilderImpl::with_type(size_hint, value_type.clone()); - let mut bitmap = BitmapBuilder::with_capacity(size_hint); - for v in values { - bitmap.append(v.is_some()); - let last_offset = *offsets.last().unwrap(); +impl FromIterator> for ListArray +where + T: PrimitiveArrayItemType, + L: IntoIterator, +{ + fn from_iter>>(iter: I) -> Self { + let iter = iter.into_iter(); + let mut builder = ListArrayBuilder::with_type( + iter.size_hint().0, + DataType::List(Box::new(T::DATA_TYPE.clone())), + ); + for v in iter { match v { - Some(a) => { - offsets.push( - last_offset - .checked_add(a.len() as u32) - .expect("offset overflow"), - ); - builder.append_array(&a) - } - None => { - offsets.push(last_offset); + None => builder.append(None), + Some(v) => { + builder.append(Some(v.into_iter().collect::().as_scalar_ref())) } } } - ListArray { - bitmap: bitmap.finish(), - offsets, - value: Box::new(builder.finish()), - value_type, - } + builder.finish() } +} - #[cfg(test)] - pub fn values_vec(&self) -> Vec> { - use crate::types::ScalarRef; - - self.iter() - .map(|v| v.map(|s| s.to_owned_scalar())) - .collect_vec() +impl FromIterator for ListArray { + fn from_iter>(iter: I) -> Self { + let mut iter = iter.into_iter(); + let first = iter.next().expect("empty iterator"); + let mut builder = ListArrayBuilder::with_type( + iter.size_hint().0, + DataType::List(Box::new(first.data_type())), + ); + builder.append(Some(first.as_scalar_ref())); + for v in iter { + builder.append(Some(v.as_scalar_ref())); + } + builder.finish() } } -#[derive(Clone, Debug, Eq, Default, PartialEq, Hash)] +#[derive(Clone, PartialEq, Eq, EstimateSize)] pub struct ListValue { - values: Box<[Datum]>, + values: Box, } -impl PartialOrd for ListValue { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +impl Debug for ListValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.as_scalar_ref().fmt(f) } } -impl Ord for ListValue { - fn cmp(&self, other: &Self) -> Ordering { - self.as_scalar_ref().cmp(&other.as_scalar_ref()) +impl Display for ListValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.as_scalar_ref().write(f) } } -impl Index for ListValue { - type Output = Datum; +impl ListValue { + pub fn new(values: ArrayImpl) -> Self { + Self { + values: Box::new(values), + } + } - fn index(&self, index: usize) -> &Self::Output { - &self.values[index] + pub fn into_array(self) -> ArrayImpl { + *self.values } -} -impl IndexMut for ListValue { - fn index_mut(&mut self, index: usize) -> &mut Self::Output { - &mut self.values[index] + pub fn empty(datatype: &DataType) -> Self { + Self::new(datatype.create_array_builder(0).finish()) } -} -// Used to display ListValue in explain for better readibilty. -pub fn display_for_explain(list: &ListValue) -> String { - // Example of ListValue display: ARRAY[1, 2, null] - format!( - "ARRAY[{}]", - list.values - .iter() - .map(|v| { - match v.as_ref() { - None => "null".into(), - Some(scalar) => scalar.as_scalar_ref_impl().to_text(), - } - }) - .collect::>() - .join(", ") - ) -} + /// Creates a new `ListValue` from an iterator of `Datum`. + pub fn from_datum_iter( + datatype: &DataType, + iter: impl IntoIterator, + ) -> Self { + let iter = iter.into_iter(); + let mut builder = datatype.create_array_builder(iter.size_hint().0); + for datum in iter { + builder.append(datum); + } + Self::new(builder.finish()) + } -impl From> for ListValue { - fn from(data: Vec) -> Self { - ListValue::new(data) + /// Returns the length of the list. + pub fn len(&self) -> usize { + self.values.len() } -} -impl From for Vec { - fn from(list: ListValue) -> Self { - list.values.into() + /// Returns `true` if the list has a length of 0. + pub fn is_empty(&self) -> bool { + self.values.is_empty() } -} -impl EstimateSize for ListValue { - fn estimated_heap_size(&self) -> usize { - // TODO: Try speed up this process. - self.values - .iter() - .map(|datum| datum.estimated_heap_size()) - .sum() + /// Iterates over the elements of the list. + pub fn iter(&self) -> impl DoubleEndedIterator + ExactSizeIterator> { + self.values.iter() } -} -impl ListValue { - pub fn new(values: Vec) -> Self { - Self { - values: values.into_boxed_slice(), + /// Get the element at the given index. Returns `None` if the index is out of bounds. + pub fn get(&self, index: usize) -> Option> { + if index < self.len() { + Some(self.values.value_at(index)) + } else { + None } } - pub fn values(&self) -> &[Datum] { - &self.values + /// Returns the data type of the elements in the list. + pub fn data_type(&self) -> DataType { + self.values.data_type() } pub fn memcmp_deserialize( @@ -433,80 +394,143 @@ impl ListValue { ) -> memcomparable::Result { let bytes = serde_bytes::ByteBuf::deserialize(deserializer)?; let mut inner_deserializer = memcomparable::Deserializer::new(bytes.as_slice()); - let mut values = Vec::new(); + let mut builder = datatype.create_array_builder(0); while inner_deserializer.has_remaining() { - values.push(memcmp_encoding::deserialize_datum_in_composite( + builder.append(memcmp_encoding::deserialize_datum_in_composite( datatype, &mut inner_deserializer, )?) } - Ok(Self::new(values)) + Ok(Self::new(builder.finish())) + } + + // Used to display ListValue in explain for better readibilty. + pub fn display_for_explain(&self) -> String { + // Example of ListValue display: ARRAY[1, 2, null] + format!( + "ARRAY[{}]", + self.iter() + .map(|v| { + match v.as_ref() { + None => "null".into(), + Some(scalar) => scalar.to_text(), + } + }) + .format(", ") + ) + } + + /// Returns a mutable slice if the list is of type `int64[]`. + pub fn as_i64_mut_slice(&mut self) -> Option<&mut [i64]> { + match self.values.as_mut() { + ArrayImpl::Int64(array) => Some(array.as_mut_slice()), + _ => None, + } + } +} + +impl PartialOrd for ListValue { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ListValue { + fn cmp(&self, other: &Self) -> Ordering { + self.as_scalar_ref().cmp(&other.as_scalar_ref()) + } +} + +impl FromIterator> for ListValue { + fn from_iter>>(iter: I) -> Self { + Self::new(iter.into_iter().collect::>().into()) + } +} + +impl FromIterator for ListValue { + fn from_iter>(iter: I) -> Self { + Self::new(iter.into_iter().collect::>().into()) + } +} + +impl FromIterator for ListValue { + fn from_iter>(iter: I) -> Self { + Self::new(iter.into_iter().collect::().into()) + } +} + +impl<'a> FromIterator> for ListValue { + fn from_iter>>(iter: I) -> Self { + Self::new(iter.into_iter().collect::().into()) + } +} + +impl<'a> FromIterator<&'a str> for ListValue { + fn from_iter>(iter: I) -> Self { + Self::new(iter.into_iter().collect::().into()) + } +} + +impl FromIterator for ListValue { + fn from_iter>(iter: I) -> Self { + Self::new(iter.into_iter().collect::().into()) + } +} + +impl From for ArrayImpl { + fn from(value: ListValue) -> Self { + *value.values } } #[derive(Copy, Clone)] -pub enum ListRef<'a> { - Indexed { arr: &'a ListArray, idx: usize }, - ValueRef { val: &'a ListValue }, +pub struct ListRef<'a> { + array: &'a ArrayImpl, + start: u32, + end: u32, } impl<'a> ListRef<'a> { /// Returns the length of the list. pub fn len(&self) -> usize { - match self { - ListRef::Indexed { arr, idx } => (arr.offsets[*idx + 1] - arr.offsets[*idx]) as usize, - ListRef::ValueRef { val } => val.values.len(), - } + (self.end - self.start) as usize } /// Returns `true` if the list has a length of 0. pub fn is_empty(&self) -> bool { - self.len() == 0 + self.start == self.end } - /// Returns the elements in the flattened list. - pub fn flatten(self) -> Vec> { - // XXX: avoid using vector - iter_elems_ref!(self, it, { - it.flat_map(|datum_ref| { - if let Some(ScalarRefImpl::List(list_ref)) = datum_ref { - list_ref.flatten() - } else { - vec![datum_ref] - } - .into_iter() - }) - .collect() - }) + /// Returns the data type of the elements in the list. + pub fn data_type(&self) -> DataType { + self.array.data_type() } - /// Returns the total number of elements in the flattened list. - pub fn flatten_len(self) -> usize { - iter_elems_ref!(self, it, { - it.map(|datum_ref| { - if let Some(ScalarRefImpl::List(list_ref)) = datum_ref { - list_ref.flatten_len() - } else { - 1 - } - }) - .sum() - }) + /// Returns the elements in the flattened list. + pub fn flatten(self) -> ListRef<'a> { + match self.array { + ArrayImpl::List(inner) => ListRef { + array: &inner.value, + start: inner.offsets[self.start as usize], + end: inner.offsets[self.end as usize], + } + .flatten(), + _ => self, + } } /// Iterates over the elements of the list. - /// - /// Prefer using the macro `iter_elems_ref!` if possible to avoid the cost of enum dispatching. - pub fn iter(self) -> impl ExactSizeIterator> + 'a { - iter_elems_ref!(self, it, { Either::Left(it) }, { Either::Right(it) }) + pub fn iter(self) -> impl DoubleEndedIterator + ExactSizeIterator> + 'a { + (self.start..self.end).map(|i| self.array.value_at(i as usize)) } /// Get the element at the given index. Returns `None` if the index is out of bounds. pub fn get(self, index: usize) -> Option> { - iter_elems_ref!(self, it, { - let mut it = it; - it.nth(index) - }) + if index < self.len() { + Some(self.array.value_at(self.start as usize + index)) + } else { + None + } } pub fn memcmp_serialize( @@ -514,37 +538,45 @@ impl<'a> ListRef<'a> { serializer: &mut memcomparable::Serializer, ) -> memcomparable::Result<()> { let mut inner_serializer = memcomparable::Serializer::new(vec![]); - iter_elems_ref!(self, it, { - for datum_ref in it { - memcmp_encoding::serialize_datum_in_composite(datum_ref, &mut inner_serializer)? - } - }); + for datum_ref in self.iter() { + memcmp_encoding::serialize_datum_in_composite(datum_ref, &mut inner_serializer)? + } serializer.serialize_bytes(&inner_serializer.into_inner()) } pub fn hash_scalar_inner(self, state: &mut H) { - iter_elems_ref!(self, it, { - for datum_ref in it { - hash_datum(datum_ref, state); - } - }) + for datum_ref in self.iter() { + hash_datum(datum_ref, state); + } } /// estimate the serialized size with value encoding pub fn estimate_serialize_size_inner(self) -> usize { - iter_elems_ref!(self, it, { - it.fold(0, |acc, datum_ref| { - acc + estimate_serialize_datum_size(datum_ref) - }) - }) + self.iter().map(estimate_serialize_datum_size).sum() + } + + pub fn to_owned(self) -> ListValue { + let mut builder = self.array.create_builder(self.len()); + for datum_ref in self.iter() { + builder.append(datum_ref); + } + ListValue::new(builder.finish()) + } + + /// Returns a slice if the list is of type `int64[]`. + pub fn as_i64_slice(&self) -> Option<&[i64]> { + match &self.array { + ArrayImpl::Int64(array) => { + Some(&array.as_slice()[self.start as usize..self.end as usize]) + } + _ => None, + } } } impl PartialEq for ListRef<'_> { fn eq(&self, other: &Self) -> bool { - iter_elems_ref!(*self, lhs, { - iter_elems_ref!(*other, rhs, { lhs.eq(rhs) }) - }) + self.iter().eq(other.iter()) } } @@ -558,19 +590,13 @@ impl PartialOrd for ListRef<'_> { impl Ord for ListRef<'_> { fn cmp(&self, other: &Self) -> Ordering { - iter_elems_ref!(*self, lhs, { - iter_elems_ref!(*other, rhs, { - lhs.cmp_by(rhs, |lv, rv| lv.default_cmp(&rv)) - }) - }) + self.iter().cmp_by(other.iter(), |a, b| a.default_cmp(&b)) } } impl Debug for ListRef<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut f = f.debug_list(); - iter_elems_ref!(*self, it, { f.entries(it) }); - f.finish() + f.debug_list().entries(self.iter()).finish() } } @@ -578,38 +604,36 @@ impl ToText for ListRef<'_> { // This function will be invoked when pgwire prints a list value in string. // Refer to PostgreSQL `array_out` or `appendPGArray`. fn write(&self, f: &mut W) -> std::fmt::Result { - iter_elems_ref!(*self, it, { - write!( - f, - "{{{}}}", - it.format_with(",", |datum_ref, f| { - let s = datum_ref.to_text(); - // Never quote null or inner list, but quote empty, verbatim 'null', special - // chars and whitespaces. - let need_quote = !matches!(datum_ref, None | Some(ScalarRefImpl::List(_))) - && (s.is_empty() - || s.to_ascii_lowercase() == "null" - || s.contains([ - '"', '\\', '{', '}', ',', - // PostgreSQL `array_isspace` includes '\x0B' but rust - // [`char::is_ascii_whitespace`] does not. - ' ', '\t', '\n', '\r', '\x0B', '\x0C', - ])); - if need_quote { - f(&"\"")?; - s.chars().try_for_each(|c| { - if c == '"' || c == '\\' { - f(&"\\")?; - } - f(&c) - })?; - f(&"\"") - } else { - f(&s) - } - }) - ) - }) + write!( + f, + "{{{}}}", + self.iter().format_with(",", |datum_ref, f| { + let s = datum_ref.to_text(); + // Never quote null or inner list, but quote empty, verbatim 'null', special + // chars and whitespaces. + let need_quote = !matches!(datum_ref, None | Some(ScalarRefImpl::List(_))) + && (s.is_empty() + || s.to_ascii_lowercase() == "null" + || s.contains([ + '"', '\\', '{', '}', ',', + // PostgreSQL `array_isspace` includes '\x0B' but rust + // [`char::is_ascii_whitespace`] does not. + ' ', '\t', '\n', '\r', '\x0B', '\x0C', + ])); + if need_quote { + f(&"\"")?; + s.chars().try_for_each(|c| { + if c == '"' || c == '\\' { + f(&"\\")?; + } + f(&c) + })?; + f(&"\"") + } else { + f(&s) + } + }) + ) } fn write_with_type(&self, ty: &DataType, f: &mut W) -> std::fmt::Result { @@ -621,8 +645,18 @@ impl ToText for ListRef<'_> { } impl<'a> From<&'a ListValue> for ListRef<'a> { - fn from(val: &'a ListValue) -> Self { - ListRef::ValueRef { val } + fn from(value: &'a ListValue) -> Self { + ListRef { + array: &value.values, + start: 0, + end: value.len() as u32, + } + } +} + +impl From> for ListValue { + fn from(value: ListRef<'_>) -> Self { + value.to_owned() } } @@ -656,15 +690,15 @@ impl ListValue { } self.skip_whitespace(); if self.try_consume('}') { - return Ok(ListValue::new(vec![])); + return Ok(ListValue::empty(self.data_type.as_list())); } - let mut elems = Vec::new(); + let mut builder = ArrayBuilderImpl::with_type(0, self.data_type.as_list().clone()); loop { let mut parser = Self { input: self.input, data_type: self.data_type.as_list(), }; - elems.push(parser.parse()?); + builder.append(parser.parse()?); self.input = parser.input; // expect ',' or '}' @@ -681,7 +715,7 @@ impl ListValue { _ => return Err("Unexpected array element.".to_string()), } } - Ok(ListValue::new(elems)) + Ok(ListValue::new(builder.finish())) } /// Parse a non-array value. @@ -862,87 +896,44 @@ mod tests { use more_asserts::{assert_gt, assert_lt}; use super::*; - use crate::try_match_expand; #[test] - fn test_list_with_values() { + fn test_protobuf() { use crate::array::*; - let arr = ListArray::from_iter( - [ - Some(I32Array::from_iter([Some(12), Some(-7), Some(25)]).into()), - None, - Some(I32Array::from_iter([Some(0), Some(-127), Some(127), Some(50)]).into()), - Some(I32Array::from_iter([0; 0]).into()), - ], - DataType::Int32, - ); - let actual = ListArray::from_protobuf(&arr.to_protobuf()).unwrap(); - let tmp = ArrayImpl::List(arr); - assert_eq!(tmp, actual); - - let arr = try_match_expand!(actual, ArrayImpl::List).unwrap(); - let list_values = arr.values_vec(); - assert_eq!( - list_values, - vec![ - Some(ListValue::new(vec![ - Some(ScalarImpl::Int32(12)), - Some(ScalarImpl::Int32(-7)), - Some(ScalarImpl::Int32(25)), - ])), - None, - Some(ListValue::new(vec![ - Some(ScalarImpl::Int32(0)), - Some(ScalarImpl::Int32(-127)), - Some(ScalarImpl::Int32(127)), - Some(ScalarImpl::Int32(50)), - ])), - Some(ListValue::new(vec![])), - ] - ); - - let mut builder = ListArrayBuilder::with_type(4, DataType::List(Box::new(DataType::Int32))); - list_values.iter().for_each(|v| { - builder.append(v.as_ref().map(|s| s.as_scalar_ref())); - }); - let arr = builder.finish(); - assert_eq!(arr.values_vec(), list_values); - - let part1 = ListArray::from_iter( - [ - Some(I32Array::from_iter([Some(12), Some(-7), Some(25)]).into()), - None, - ], - DataType::Int32, - ); + let array = ListArray::from_iter([ + Some(vec![12i32, -7, 25]), + None, + Some(vec![0, -127, 127, 50]), + Some(vec![]), + ]); + let actual = ListArray::from_protobuf(&array.to_protobuf()).unwrap(); + assert_eq!(actual, ArrayImpl::List(array)); + } - let part2 = ListArray::from_iter( - [ - Some(I32Array::from_iter([Some(0), Some(-127), Some(127), Some(50)]).into()), - Some(I32Array::from_iter([0; 0]).into()), - ], - DataType::Int32, - ); + #[test] + fn test_append_array() { + let part1 = ListArray::from_iter([Some([12i32, -7, 25]), None]); + let part2 = ListArray::from_iter([Some(vec![0, -127, 127, 50]), Some(vec![])]); let mut builder = ListArrayBuilder::with_type(4, DataType::List(Box::new(DataType::Int32))); builder.append_array(&part1); builder.append_array(&part2); - assert_eq!(arr.values_vec(), builder.finish().values_vec()); + let expected = ListArray::from_iter([ + Some(vec![12i32, -7, 25]), + None, + Some(vec![0, -127, 127, 50]), + Some(vec![]), + ]); + assert_eq!(builder.finish(), expected); } // Ensure `create_builder` exactly copies the same metadata. #[test] fn test_list_create_builder() { use crate::array::*; - let arr = ListArray::from_iter( - [Some( - F32Array::from_iter([Some(2.0), Some(42.0), Some(1.0)]).into(), - )], - DataType::Float32, - ); - let builder = arr.create_builder(0); - let arr2 = builder.finish(); + let arr = ListArray::from_iter([Some([F32::from(2.0), F32::from(42.0), F32::from(1.0)])]); + let arr2 = arr.create_builder(0).finish(); assert_eq!(arr.data_type(), arr2.data_type()); } @@ -953,8 +944,8 @@ mod tests { { let mut builder = ListArrayBuilder::with_type(1, DataType::List(Box::new(DataType::Int32))); - let val = ListValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); - builder.append(Some(ListRef::ValueRef { val: &val })); + let val = ListValue::from_iter([1i32, 2, 3]); + builder.append(Some(val.as_scalar_ref())); assert!(builder.pop().is_some()); assert!(builder.pop().is_none()); let arr = builder.finish(); @@ -962,28 +953,24 @@ mod tests { } { - let meta = DataType::List(Box::new(DataType::List(Box::new(DataType::Int32)))); - let mut builder = ListArrayBuilder::with_type(2, meta); - let val1 = ListValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); - let val2 = ListValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); - let list1 = ListValue::new(vec![Some(val1.into()), Some(val2.into())]); - builder.append(Some(ListRef::ValueRef { val: &list1 })); + let data_type = DataType::List(Box::new(DataType::List(Box::new(DataType::Int32)))); + let mut builder = ListArrayBuilder::with_type(2, data_type); + let val1 = ListValue::from_iter([1, 2, 3]); + let val2 = ListValue::from_iter([1, 2, 3]); + let list1 = ListValue::from_iter([val1, val2]); + builder.append(Some(list1.as_scalar_ref())); - let val3 = ListValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); - let val4 = ListValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); - let list2 = ListValue::new(vec![Some(val3.into()), Some(val4.into())]); + let val3 = ListValue::from_iter([1, 2, 3]); + let val4 = ListValue::from_iter([1, 2, 3]); + let list2 = ListValue::from_iter([val3, val4]); - builder.append(Some(ListRef::ValueRef { val: &list2 })); + builder.append(Some(list2.as_scalar_ref())); assert!(builder.pop().is_some()); let arr = builder.finish(); assert_eq!(arr.len(), 1); - - let val = arr.value_at(0).unwrap(); - - let datums = val.iter().map(ToOwnedDatum::to_owned_datum).collect_vec(); - assert_eq!(datums, list1.values.to_vec()); + assert_eq!(arr.value_at(0).unwrap(), list1.as_scalar_ref()); } } @@ -991,141 +978,69 @@ mod tests { fn test_list_nested_layout() { use crate::array::*; - let listarray1 = ListArray::from_iter( - [ - Some(I32Array::from_iter([Some(1), Some(2)]).into()), - Some(I32Array::from_iter([Some(3), Some(4)]).into()), - ], - DataType::Int32, - ); - - let listarray2 = ListArray::from_iter( - [ - Some(I32Array::from_iter([Some(5), Some(6), Some(7)]).into()), - None, - Some(I32Array::from_iter([Some(8)]).into()), - ], - DataType::Int32, - ); - - let listarray3 = ListArray::from_iter( - [Some(I32Array::from_iter([Some(9), Some(10)]).into())], - DataType::Int32, - ); + let listarray1 = ListArray::from_iter([Some([1i32, 2]), Some([3, 4])]); + let listarray2 = ListArray::from_iter([Some(vec![5, 6, 7]), None, Some(vec![8])]); + let listarray3 = ListArray::from_iter([Some([9, 10])]); let nestarray = ListArray::from_iter( - [ - Some(listarray1.into()), - Some(listarray2.into()), - Some(listarray3.into()), - ], - DataType::List(Box::new(DataType::Int32)), + [listarray1, listarray2, listarray3] + .into_iter() + .map(|l| ListValue::new(l.into())), ); let actual = ListArray::from_protobuf(&nestarray.to_protobuf()).unwrap(); assert_eq!(ArrayImpl::List(nestarray), actual); - - let nestarray = try_match_expand!(actual, ArrayImpl::List).unwrap(); - let nested_list_values = nestarray.values_vec(); - assert_eq!( - nested_list_values, - vec![ - Some(ListValue::new(vec![ - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(1)), - Some(ScalarImpl::Int32(2)), - ]))), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(3)), - Some(ScalarImpl::Int32(4)), - ]))), - ])), - Some(ListValue::new(vec![ - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(5)), - Some(ScalarImpl::Int32(6)), - Some(ScalarImpl::Int32(7)), - ]))), - None, - Some(ScalarImpl::List(ListValue::new(vec![Some( - ScalarImpl::Int32(8) - ),]))), - ])), - Some(ListValue::new(vec![Some(ScalarImpl::List( - ListValue::new(vec![ - Some(ScalarImpl::Int32(9)), - Some(ScalarImpl::Int32(10)), - ]) - )),])), - ] - ); - - let mut builder = ListArrayBuilder::with_type( - 3, - DataType::List(Box::new(DataType::List(Box::new(DataType::Int32)))), - ); - for v in &nested_list_values { - builder.append(v.as_ref().map(|s| s.as_scalar_ref())); - } - let nestarray = builder.finish(); - assert_eq!(nestarray.values_vec(), nested_list_values); } #[test] fn test_list_value_cmp() { // ARRAY[1, 1] < ARRAY[1, 2, 1] assert_lt!( - ListValue::new(vec![Some(1.into()), Some(1.into())]), - ListValue::new(vec![Some(1.into()), Some(2.into()), Some(1.into())]), + ListValue::from_iter([1, 1]), + ListValue::from_iter([1, 2, 1]), ); // ARRAY[1, 2] < ARRAY[1, 2, 1] assert_lt!( - ListValue::new(vec![Some(1.into()), Some(2.into())]), - ListValue::new(vec![Some(1.into()), Some(2.into()), Some(1.into())]), + ListValue::from_iter([1, 2]), + ListValue::from_iter([1, 2, 1]), ); // ARRAY[1, 3] > ARRAY[1, 2, 1] assert_gt!( - ListValue::new(vec![Some(1.into()), Some(3.into())]), - ListValue::new(vec![Some(1.into()), Some(2.into()), Some(1.into())]), + ListValue::from_iter([1, 3]), + ListValue::from_iter([1, 2, 1]), ); // null > 1 assert_gt!( - ListValue::new(vec![None]), - ListValue::new(vec![Some(1.into())]), + ListValue::from_iter([None::]), + ListValue::from_iter([1]), ); // ARRAY[1, 2, null] > ARRAY[1, 2, 1] assert_gt!( - ListValue::new(vec![Some(1.into()), Some(2.into()), None]), - ListValue::new(vec![Some(1.into()), Some(2.into()), Some(1.into())]), + ListValue::from_iter([Some(1), Some(2), None]), + ListValue::from_iter([Some(1), Some(2), Some(1)]), ); // Null value in first ARRAY results into a Greater ordering regardless of the smaller ARRAY // length. ARRAY[1, null] > ARRAY[1, 2, 3] assert_gt!( - ListValue::new(vec![Some(1.into()), None]), - ListValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]), + ListValue::from_iter([Some(1), None]), + ListValue::from_iter([Some(1), Some(2), Some(3)]), ); // ARRAY[1, null] == ARRAY[1, null] assert_eq!( - ListValue::new(vec![Some(1.into()), None]), - ListValue::new(vec![Some(1.into()), None]), + ListValue::from_iter([Some(1), None]), + ListValue::from_iter([Some(1), None]), ); } #[test] fn test_list_ref_display() { - let v = ListValue::new(vec![Some(1.into()), None]); - let r = ListRef::ValueRef { val: &v }; - assert_eq!("{1,NULL}".to_string(), format!("{}", r.to_text())); + let v = ListValue::from_iter([Some(1), None]); + assert_eq!(v.to_string(), "{1,NULL}"); } #[test] fn test_serialize_deserialize() { - let value = ListValue::new(vec![ - Some("abcd".into()), - Some("".into()), - None, - Some("a".into()), - ]); - let list_ref = ListRef::ValueRef { val: &value }; + let value = ListValue::from_iter([Some("abcd"), Some(""), None, Some("a")]); + let list_ref = value.as_scalar_ref(); let mut serializer = memcomparable::Serializer::new(vec![]); serializer.set_reverse(true); list_ref.memcmp_serialize(&mut serializer).unwrap(); @@ -1156,84 +1071,39 @@ mod tests { fn test_memcomparable() { let cases = [ ( - ListValue::new(vec![ - Some(123.to_scalar_value()), - Some(456.to_scalar_value()), - ]), - ListValue::new(vec![ - Some(123.to_scalar_value()), - Some(789.to_scalar_value()), - ]), - DataType::Int32, - Ordering::Less, + ListValue::from_iter([123, 456]), + ListValue::from_iter([123, 789]), ), ( - ListValue::new(vec![ - Some(123.to_scalar_value()), - Some(456.to_scalar_value()), - ]), - ListValue::new(vec![Some(123.to_scalar_value())]), - DataType::Int32, - Ordering::Greater, + ListValue::from_iter([123, 456]), + ListValue::from_iter([123]), ), ( - ListValue::new(vec![None, Some("".into())]), - ListValue::new(vec![None, None]), - DataType::Varchar, - Ordering::Less, + ListValue::from_iter([None, Some("")]), + ListValue::from_iter([None, None::<&str>]), ), ( - ListValue::new(vec![Some(2.to_scalar_value())]), - ListValue::new(vec![ - Some(1.to_scalar_value()), - None, - Some(3.to_scalar_value()), - ]), - DataType::Int32, - Ordering::Greater, + ListValue::from_iter([Some(2)]), + ListValue::from_iter([Some(1), None, Some(3)]), ), ]; - for (lhs, rhs, datatype, order) in cases { + for (lhs, rhs) in cases { let lhs_serialized = { let mut serializer = memcomparable::Serializer::new(vec![]); - ListRef::ValueRef { val: &lhs } + lhs.as_scalar_ref() .memcmp_serialize(&mut serializer) .unwrap(); serializer.into_inner() }; let rhs_serialized = { let mut serializer = memcomparable::Serializer::new(vec![]); - ListRef::ValueRef { val: &rhs } + rhs.as_scalar_ref() .memcmp_serialize(&mut serializer) .unwrap(); serializer.into_inner() }; - assert_eq!(lhs_serialized.cmp(&rhs_serialized), order); - - let mut builder = ListArrayBuilder::with_type(0, DataType::List(Box::new(datatype))); - builder.append(Some(ListRef::ValueRef { val: &lhs })); - builder.append(Some(ListRef::ValueRef { val: &rhs })); - let array = builder.finish(); - let lhs_serialized = { - let mut serializer = memcomparable::Serializer::new(vec![]); - array - .value_at(0) - .unwrap() - .memcmp_serialize(&mut serializer) - .unwrap(); - serializer.into_inner() - }; - let rhs_serialized = { - let mut serializer = memcomparable::Serializer::new(vec![]); - array - .value_at(1) - .unwrap() - .memcmp_serialize(&mut serializer) - .unwrap(); - serializer.into_inner() - }; - assert_eq!(lhs_serialized.cmp(&rhs_serialized), order); + assert_eq!(lhs_serialized.cmp(&rhs_serialized), lhs.cmp(&rhs)); } } @@ -1241,28 +1111,12 @@ mod tests { fn test_listref() { use crate::array::*; use crate::types; - let arr = ListArray::from_iter( - [ - Some(I32Array::from_iter([Some(1), Some(2), Some(3)]).into()), - None, - Some(I32Array::from_iter([Some(4), Some(5), Some(6), Some(7)]).into()), - ], - DataType::Int32, - ); + + let arr = ListArray::from_iter([Some(vec![1, 2, 3]), None, Some(vec![4, 5, 6, 7])]); // get 3rd ListRef from ListArray let list_ref = arr.value_at(2).unwrap(); - assert_eq!( - list_ref, - ListRef::ValueRef { - val: &ListValue::new(vec![ - Some(4.to_scalar_value()), - Some(5.to_scalar_value()), - Some(6.to_scalar_value()), - Some(7.to_scalar_value()), - ]), - } - ); + assert_eq!(list_ref, ListValue::from_iter([4, 5, 6, 7]).as_scalar_ref()); // Get 2nd value from ListRef let scalar = list_ref.get(1).unwrap(); diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 39ac3023049c8..086f7ffd5cc9d 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -324,7 +324,7 @@ impl CompactableArray for A { macro_rules! array_impl_enum { ( $( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => { /// `ArrayImpl` embeds all possible array in `array` module. - #[derive(Debug, Clone)] + #[derive(Debug, Clone, EstimateSize)] pub enum ArrayImpl { $( $variant_name($array) ),* } @@ -441,7 +441,7 @@ for_all_array_variants! { impl_convert } macro_rules! array_builder_impl_enum { ($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => { /// `ArrayBuilderImpl` embeds all possible array in `array` module. - #[derive(Debug)] + #[derive(Debug, Clone, EstimateSize)] pub enum ArrayBuilderImpl { $( $variant_name($builder) ),* } @@ -613,12 +613,6 @@ impl ArrayImpl { } } -impl EstimateSize for ArrayImpl { - fn estimated_heap_size(&self) -> usize { - dispatch_array_variants!(self, inner, { inner.estimated_heap_size() }) - } -} - pub type ArrayRef = Arc; impl PartialEq for ArrayImpl { @@ -627,6 +621,8 @@ impl PartialEq for ArrayImpl { } } +impl Eq for ArrayImpl {} + #[cfg(test)] mod tests { diff --git a/src/common/src/array/num256_array.rs b/src/common/src/array/num256_array.rs index 9845ead46ba05..e610be87f357b 100644 --- a/src/common/src/array/num256_array.rs +++ b/src/common/src/array/num256_array.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::io::{Cursor, Read}; -use std::mem::size_of; use ethnum::I256; use risingwave_pb::common::buffer::CompressionType; @@ -22,21 +21,23 @@ use risingwave_pb::data::PbArray; use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; +use crate::estimate_size::{EstimateSize, ZeroHeapSize}; use crate::types::{DataType, Int256, Int256Ref, Scalar}; -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct Int256ArrayBuilder { bitmap: BitmapBuilder, data: Vec, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, EstimateSize)] pub struct Int256Array { bitmap: Bitmap, - data: Vec, + data: Box<[I256]>, } +impl ZeroHeapSize for I256 {} + #[rustfmt::skip] macro_rules! impl_array_for_num256 { ( @@ -149,7 +150,7 @@ macro_rules! impl_array_for_num256 { fn finish(self) -> Self::ArrayType { Self::ArrayType { bitmap: self.bitmap.finish(), - data: self.data, + data: self.data.into(), } } } @@ -204,15 +205,9 @@ impl_array_for_num256!( Int256 ); -impl EstimateSize for Int256Array { - fn estimated_heap_size(&self) -> usize { - self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::() - } -} - impl FromIterator for Int256Array { fn from_iter>(iter: I) -> Self { - let data: Vec = iter.into_iter().map(|i| *i.0).collect(); + let data: Box<[I256]> = iter.into_iter().map(|i| *i.0).collect(); Int256Array { bitmap: Bitmap::ones(data.len()), data, diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 1bdce4c9104c6..fbf458644c0f9 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -22,7 +22,7 @@ use risingwave_pb::data::{ArrayType, PbArray}; use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; +use crate::estimate_size::{EstimateSize, ZeroHeapSize}; use crate::for_all_native_types; use crate::types::*; @@ -32,7 +32,7 @@ where for<'a> Self: Sized + Default + PartialOrd - + EstimateSize + + ZeroHeapSize + Scalar = Self> + ScalarRef<'a, ScalarType = Self>, { @@ -121,10 +121,10 @@ impl_primitive_for_others! { } /// `PrimitiveArray` is a collection of primitive types, such as `i32`, `f32`. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)] pub struct PrimitiveArray { bitmap: Bitmap, - data: Vec, + data: Box<[T]>, } impl FromIterator> for PrimitiveArray { @@ -146,7 +146,7 @@ impl<'a, T: PrimitiveArrayItemType> FromIterator<&'a Option> for PrimitiveArr impl FromIterator for PrimitiveArray { fn from_iter>(iter: I) -> Self { - let data: Vec = iter.into_iter().collect(); + let data: Box<[T]> = iter.into_iter().collect(); PrimitiveArray { bitmap: Bitmap::ones(data.len()), data, @@ -183,10 +183,20 @@ impl PrimitiveArray { /// /// NOTE: The length of `bitmap` must be equal to the length of `iter`. pub fn from_iter_bitmap(iter: impl IntoIterator, bitmap: Bitmap) -> Self { - let data: Vec = iter.into_iter().collect(); + let data: Box<[T]> = iter.into_iter().collect(); assert_eq!(data.len(), bitmap.len()); PrimitiveArray { bitmap, data } } + + /// Returns a slice containing the entire array. + pub fn as_slice(&self) -> &[T] { + &self.data + } + + /// Returns a mutable slice containing the entire array. + pub fn as_mut_slice(&mut self) -> &mut [T] { + &mut self.data + } } impl Array for PrimitiveArray { @@ -245,7 +255,7 @@ impl Array for PrimitiveArray { } /// `PrimitiveArrayBuilder` constructs a `PrimitiveArray` from `Option`. -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct PrimitiveArrayBuilder { bitmap: BitmapBuilder, data: Vec, @@ -297,17 +307,11 @@ impl ArrayBuilder for PrimitiveArrayBuilder { fn finish(self) -> PrimitiveArray { PrimitiveArray { bitmap: self.bitmap.finish(), - data: self.data, + data: self.data.into(), } } } -impl EstimateSize for PrimitiveArray { - fn estimated_heap_size(&self) -> usize { - self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 492a7ca3aed1d..f303b1cb8d8d4 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -52,7 +52,7 @@ macro_rules! iter_fields_ref { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StructArrayBuilder { bitmap: BitmapBuilder, pub(super) children_array: Vec, @@ -111,7 +111,11 @@ impl ArrayBuilder for StructArrayBuilder { fn append_array(&mut self, other: &StructArray) { self.bitmap.append_bitmap(&other.bitmap); - for (a, o) in self.children_array.iter_mut().zip_eq_fast(&other.children) { + for (a, o) in self + .children_array + .iter_mut() + .zip_eq_fast(other.children.iter()) + { a.append_array(o); } self.len += other.len(); @@ -144,10 +148,21 @@ impl ArrayBuilder for StructArrayBuilder { } } +impl EstimateSize for StructArrayBuilder { + fn estimated_heap_size(&self) -> usize { + self.bitmap.estimated_heap_size() + + self + .children_array + .iter() + .map(|a| a.estimated_heap_size()) + .sum::() + } +} + #[derive(Debug, Clone, PartialEq)] pub struct StructArray { bitmap: Bitmap, - children: Vec, + children: Box<[ArrayRef]>, type_: StructType, heap_size: usize, } @@ -207,7 +222,7 @@ impl StructArray { Self { bitmap, - children, + children: children.into(), type_, heap_size, } diff --git a/src/common/src/array/utf8_array.rs b/src/common/src/array/utf8_array.rs index 577e5e4f4005d..235f6e75e85b1 100644 --- a/src/common/src/array/utf8_array.rs +++ b/src/common/src/array/utf8_array.rs @@ -119,7 +119,7 @@ impl Utf8Array { } /// `Utf8ArrayBuilder` use `&str` to build an `Utf8Array`. -#[derive(Debug)] +#[derive(Debug, Clone, EstimateSize)] pub struct Utf8ArrayBuilder { bytes: BytesArrayBuilder, } diff --git a/src/common/src/buffer/bitmap.rs b/src/common/src/buffer/bitmap.rs index 9e725a17ba6ef..dd76f59ade44f 100644 --- a/src/common/src/buffer/bitmap.rs +++ b/src/common/src/buffer/bitmap.rs @@ -44,7 +44,7 @@ use risingwave_pb::common::PbBuffer; use crate::estimate_size::EstimateSize; -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone, EstimateSize)] pub struct BitmapBuilder { len: usize, data: Vec, diff --git a/src/common/src/field_generator/mod.rs b/src/common/src/field_generator/mod.rs index 0daa2b640c607..cf5995f9d287f 100644 --- a/src/common/src/field_generator/mod.rs +++ b/src/common/src/field_generator/mod.rs @@ -292,13 +292,35 @@ impl FieldGeneratorImpl { Some(ScalarImpl::Struct(StructValue::new(data))) } FieldGeneratorImpl::List(field, list_length) => { - let data = (0..*list_length) - .map(|_| field.generate_datum(offset)) - .collect::>(); - Some(ScalarImpl::List(ListValue::new(data))) + Some(ScalarImpl::List(ListValue::from_datum_iter( + &field.data_type(), + std::iter::repeat_with(|| field.generate_datum(offset)).take(*list_length), + ))) } } } + + fn data_type(&self) -> DataType { + match self { + Self::I16Sequence(_) => DataType::Int16, + Self::I32Sequence(_) => DataType::Int32, + Self::I64Sequence(_) => DataType::Int64, + Self::F32Sequence(_) => DataType::Float32, + Self::F64Sequence(_) => DataType::Float64, + Self::I16Random(_) => DataType::Int16, + Self::I32Random(_) => DataType::Int32, + Self::I64Random(_) => DataType::Int64, + Self::F32Random(_) => DataType::Float32, + Self::F64Random(_) => DataType::Float64, + Self::VarcharRandomFixedLength(_) => DataType::Varchar, + Self::VarcharRandomVariableLength(_) => DataType::Varchar, + Self::VarcharConstant => DataType::Varchar, + Self::Timestamp(_) => DataType::Timestamp, + Self::Timestamptz(_) => DataType::Timestamptz, + Self::Struct(_) => todo!("data_type for struct"), + Self::List(inner, _) => DataType::List(Box::new(inner.data_type())), + } + } } #[cfg(test)] diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index f2dd8ad42854b..7c134324ebdd7 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -146,8 +146,8 @@ impl RandValue for StructValue { } impl RandValue for ListValue { - fn rand_value(_rand: &mut R) -> Self { - ListValue::new(vec![]) + fn rand_value(rand: &mut R) -> Self { + ListValue::from_iter([rand.gen::()]) } } diff --git a/src/common/src/types/decimal.rs b/src/common/src/types/decimal.rs index 861ccce33a575..4e9ba74f2db63 100644 --- a/src/common/src/types/decimal.rs +++ b/src/common/src/types/decimal.rs @@ -28,13 +28,11 @@ use super::to_binary::ToBinary; use super::to_text::ToText; use super::DataType; use crate::array::ArrayResult; -use crate::estimate_size::EstimateSize; +use crate::estimate_size::ZeroHeapSize; use crate::types::ordered_float::OrderedFloat; use crate::types::Decimal::Normalized; -#[derive( - Debug, Copy, parse_display::Display, Clone, PartialEq, Hash, Eq, Ord, PartialOrd, EstimateSize, -)] +#[derive(Debug, Copy, parse_display::Display, Clone, PartialEq, Hash, Eq, Ord, PartialOrd)] pub enum Decimal { #[display("-Infinity")] NegativeInf, @@ -46,6 +44,8 @@ pub enum Decimal { NaN, } +impl ZeroHeapSize for Decimal {} + impl ToText for Decimal { fn write(&self, f: &mut W) -> std::fmt::Result { write!(f, "{self}") @@ -771,6 +771,7 @@ mod tests { use itertools::Itertools as _; use super::*; + use crate::estimate_size::EstimateSize; use crate::util::iter_util::ZipEqFast; fn check(lhs: f32, rhs: f32) -> bool { diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index 0990a5c8e61f4..52b7f2f4299c7 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -31,7 +31,7 @@ use rust_decimal::prelude::Decimal; use super::to_binary::ToBinary; use super::*; -use crate::estimate_size::EstimateSize; +use crate::estimate_size::ZeroHeapSize; /// Every interval can be represented by a `Interval`. /// @@ -42,13 +42,15 @@ use crate::estimate_size::EstimateSize; /// One month may contain 28/31 days. One day may contain 23/25 hours. /// This internals is learned from PG: /// -#[derive(Debug, Clone, Copy, Default, EstimateSize)] +#[derive(Debug, Clone, Copy, Default)] pub struct Interval { months: i32, days: i32, usecs: i64, } +impl ZeroHeapSize for Interval {} + const USECS_PER_SEC: i64 = 1_000_000; const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC; const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY; diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 2450c2664e273..40e331227c8d6 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -423,7 +423,7 @@ impl DataType { .map(|data_type| Some(data_type.min_value())) .collect_vec(), )), - DataType::List { .. } => ScalarImpl::List(ListValue::new(vec![])), + DataType::List(data_type) => ScalarImpl::List(ListValue::empty(data_type)), } } @@ -913,16 +913,16 @@ impl ScalarImpl { DataType::Jsonb => { Self::Jsonb(JsonbVal::from_str(str).map_err(|_| FromSqlError::from_text(str))?) } - DataType::List(datatype) => { + DataType::List(elem_type) => { // TODO: support nested list if !(str.starts_with('{') && str.ends_with('}')) { return Err(FromSqlError::from_text(str)); } - let mut values = vec![]; + let mut builder = elem_type.create_array_builder(0); for s in str[1..str.len() - 1].split(',') { - values.push(Some(Self::from_text(s.trim().as_bytes(), datatype)?)); + builder.append(Some(Self::from_text(s.trim().as_bytes(), elem_type)?)); } - Self::List(ListValue::new(values)) + Self::List(ListValue::new(builder.finish())) } DataType::Struct(s) => { if !(str.starts_with('{') && str.ends_with('}')) { @@ -1179,7 +1179,7 @@ mod tests { } assert_item_size_eq!(StructArray, 16); // Box<[Datum]> - assert_item_size_eq!(ListArray, 16); // Box<[Datum]> + assert_item_size_eq!(ListArray, 8); // Box assert_item_size_eq!(Utf8Array, 16); // Box assert_item_size_eq!(IntervalArray, 16); assert_item_size_eq!(TimestampArray, 12); @@ -1188,6 +1188,7 @@ mod tests { assert_item_size_eq!(DecimalArray, 20); const_assert_eq!(std::mem::size_of::(), 24); + const_assert_eq!(std::mem::size_of::>(), 24); const_assert_eq!(std::mem::size_of::(), 24); const_assert_eq!(std::mem::size_of::(), 8); const_assert_eq!(std::mem::size_of::(), 16); @@ -1296,10 +1297,7 @@ mod tests { ])), ), DataTypeName::List => ( - ScalarImpl::List(ListValue::new(vec![ - ScalarImpl::Int64(233).into(), - ScalarImpl::Int64(2333).into(), - ])), + ScalarImpl::List(ListValue::from_iter([233i64, 2333])), DataType::List(Box::new(DataType::Int64)), ), }; diff --git a/src/common/src/types/ordered_float.rs b/src/common/src/types/ordered_float.rs index f6cc12140125b..f22fe395bea7a 100644 --- a/src/common/src/types/ordered_float.rs +++ b/src/common/src/types/ordered_float.rs @@ -118,11 +118,7 @@ impl AsMut for OrderedFloat { } } -impl EstimateSize for OrderedFloat { - fn estimated_heap_size(&self) -> usize { - 0 - } -} +impl ZeroHeapSize for OrderedFloat {} impl<'a, T: Float> From<&'a T> for &'a OrderedFloat { #[inline] @@ -988,7 +984,7 @@ mod impl_into_ordered { pub use impl_into_ordered::IntoOrdered; use serde::Serialize; -use crate::estimate_size::EstimateSize; +use crate::estimate_size::ZeroHeapSize; #[cfg(test)] mod tests { diff --git a/src/common/src/types/scalar_impl.rs b/src/common/src/types/scalar_impl.rs index 7c3a3b380eef9..25d8c8d9ca721 100644 --- a/src/common/src/types/scalar_impl.rs +++ b/src/common/src/types/scalar_impl.rs @@ -89,7 +89,7 @@ impl Scalar for ListValue { type ScalarRefType<'a> = ListRef<'a>; fn as_scalar_ref(&self) -> ListRef<'_> { - ListRef::ValueRef { val: self } + self.into() } } @@ -310,11 +310,7 @@ impl<'a> ScalarRef<'a> for ListRef<'a> { type ScalarType = ListValue; fn to_owned_scalar(&self) -> ListValue { - let fields = self - .iter() - .map(|f| f.map(|s| s.into_scalar_impl())) - .collect(); - ListValue::new(fields) + (*self).into() } fn hash_scalar(&self, state: &mut H) { diff --git a/src/common/src/types/serial.rs b/src/common/src/types/serial.rs index 6601676c93029..ad807a90abed9 100644 --- a/src/common/src/types/serial.rs +++ b/src/common/src/types/serial.rs @@ -17,7 +17,7 @@ use std::hash::Hash; use postgres_types::{ToSql as _, Type}; use serde::{Serialize, Serializer}; -use crate::estimate_size::EstimateSize; +use crate::estimate_size::ZeroHeapSize; use crate::util::row_id::RowId; // Serial is an alias for i64 @@ -30,11 +30,7 @@ impl From for Serial { } } -impl EstimateSize for Serial { - fn estimated_heap_size(&self) -> usize { - 0 - } -} +impl ZeroHeapSize for Serial {} impl Serial { #[inline] diff --git a/src/common/src/util/memcmp_encoding.rs b/src/common/src/util/memcmp_encoding.rs index 58ad76900b081..3d72b675e565c 100644 --- a/src/common/src/util/memcmp_encoding.rs +++ b/src/common/src/util/memcmp_encoding.rs @@ -490,13 +490,9 @@ mod tests { // NOTE: `NULL`s inside composite type values are always the largest. let list_none = Datum::None; - let list_1 = Datum::Some( - ListValue::new(vec![Some(ScalarImpl::from(1)), Some(ScalarImpl::from(2))]).into(), - ); - let list_2 = Datum::Some( - ListValue::new(vec![Some(ScalarImpl::from(1)), Some(ScalarImpl::from(3))]).into(), - ); - let list_3 = Datum::Some(ListValue::new(vec![Some(ScalarImpl::from(1)), None]).into()); + let list_1 = Datum::Some(ListValue::from_iter([1, 2]).into()); + let list_2 = Datum::Some(ListValue::from_iter([1, 3]).into()); + let list_3 = Datum::Some(ListValue::from_iter([Some(1), None]).into()); { // ASC NULLS FIRST (NULLS SMALLEST) diff --git a/src/common/src/util/sort_util.rs b/src/common/src/util/sort_util.rs index 091dbe3cfaa14..baeb248c87a68 100644 --- a/src/common/src/util/sort_util.rs +++ b/src/common/src/util/sort_util.rs @@ -675,10 +675,7 @@ mod tests { Some(ScalarImpl::Int32(1)), Some(ScalarImpl::Float32(3.0.into())), ]))), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(1)), - Some(ScalarImpl::Int32(2)), - ]))), + Some(ScalarImpl::List(ListValue::from_iter([1, 2]))), ]); let row2 = OwnedRow::new(vec![ Some(ScalarImpl::Int16(16)), @@ -697,10 +694,7 @@ mod tests { Some(ScalarImpl::Int32(1)), Some(ScalarImpl::Float32(33333.0.into())), // larger than row1 ]))), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(1)), - Some(ScalarImpl::Int32(2)), - ]))), + Some(ScalarImpl::List(ListValue::from_iter([1, 2]))), ]); let column_orders = (0..row1.len()) diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index 7068cc427735b..65bfa6231253f 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -366,11 +366,11 @@ fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result Result { let len = data.get_u32_le(); - let mut values = Vec::with_capacity(len as usize); + let mut builder = item_type.create_array_builder(len as usize); for _ in 0..len { - values.push(inner_deserialize_datum(data, item_type)?); + builder.append(inner_deserialize_datum(data, item_type)?); } - Ok(ScalarImpl::List(ListValue::new(values))) + Ok(ScalarImpl::List(ListValue::new(builder.finish()))) } fn deserialize_str(data: &mut impl Buf) -> Result> { @@ -493,10 +493,7 @@ mod tests { ScalarImpl::Int64(233).into(), ScalarImpl::Float64(23.33.into()).into(), ]))); - test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::new(vec![ - ScalarImpl::Int64(233).into(), - ScalarImpl::Int64(2333).into(), - ]))); + test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333]))); } #[test] diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 99ebafe46bd3c..bd661acf9861f 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -487,11 +487,12 @@ pub fn from_protobuf_value( } } Value::List(values) => { - let rw_values = values - .iter() - .map(|value| from_protobuf_value(field_desc, value, descriptor_pool)) - .collect::>>()?; - ScalarImpl::List(ListValue::new(rw_values)) + let data_type = protobuf_type_mapping(field_desc, &mut vec![])?; + let mut builder = data_type.as_list().create_array_builder(values.len()); + for value in values { + builder.append(from_protobuf_value(field_desc, value, descriptor_pool)?); + } + ScalarImpl::List(ListValue::new(builder.finish())) } Value::Bytes(value) => ScalarImpl::Bytea(value.to_vec().into_boxed_slice()), _ => { @@ -580,7 +581,7 @@ mod test { use std::path::PathBuf; use prost::Message; - use risingwave_common::types::{DataType, StructType}; + use risingwave_common::types::{DataType, ListValue, StructType}; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::data::data_type::PbTypeName; use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; @@ -857,12 +858,7 @@ mod test { pb_eq( a, "repeated_int_field", - S::List(ListValue::new( - m.repeated_int_field - .iter() - .map(|&x| Some(x.into())) - .collect(), - )), + S::List(ListValue::from_iter(m.repeated_int_field.clone())), ); pb_eq( a, diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 9712eb3c6abbe..a0c84ee6f400b 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -231,31 +231,19 @@ impl<'a> AvroParseOptions<'a> { ScalarImpl::Struct(StructValue::new(rw_values)) } // ---- List ----- - (Some(DataType::List(item_type)), Value::Array(arr)) => ListValue::new( - arr.iter() - .map(|v| { - let schema = self.extract_inner_schema(None); - Self { - schema, - relax_numeric: self.relax_numeric, - } - .parse(v, Some(item_type)) - }) - .collect::, AccessError>>()?, - ) - .into(), - (None, Value::Array(arr)) => ListValue::new( - arr.iter() - .map(|v| { - let schema = self.extract_inner_schema(None); - Self { - schema, - relax_numeric: self.relax_numeric, - } - .parse(v, None) - }) - .collect::, AccessError>>()?, - ) + (Some(DataType::List(item_type)), Value::Array(array)) => ListValue::new({ + let schema = self.extract_inner_schema(None); + let mut builder = item_type.create_array_builder(array.len()); + for v in array { + let value = Self { + schema, + relax_numeric: self.relax_numeric, + } + .parse(v, Some(item_type))?; + builder.append(value); + } + builder.finish() + }) .into(), // ---- Bytea ----- (Some(DataType::Bytea) | None, Value::Bytes(value)) => { diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 9033cb049a465..f4c8b303069e7 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -496,24 +496,17 @@ impl JsonParseOptions { } // ---- List ----- - (Some(DataType::List(item_type)), ValueType::Array) => ListValue::new( - value - .as_array() - .unwrap() - .iter() - .map(|v| self.parse(v, Some(item_type))) - .collect::, _>>()?, - ) - .into(), - (None, ValueType::Array) => ListValue::new( - value - .as_array() - .unwrap() - .iter() - .map(|v| self.parse(v, None)) - .collect::, _>>()?, - ) + (Some(DataType::List(item_type)), ValueType::Array) => ListValue::new({ + let array = value.as_array().unwrap(); + let mut builder = item_type.create_array_builder(array.len()); + for v in array { + let value = self.parse(v, Some(item_type))?; + builder.append(value); + } + builder.finish() + }) .into(), + // ---- Bytea ----- (Some(DataType::Bytea), ValueType::String) => match self.bytea_handling { ByteaHandling::Standard => str_to_bytea(value.as_str().unwrap()) diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index f9c7f8f15dbe3..a40c140900b27 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -765,31 +765,21 @@ mod tests { test_ok( &DataType::List(DataType::Int32.into()), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(4)), - Some(ScalarImpl::Int32(5)), - ]))), + Some(ScalarImpl::List(ListValue::from_iter([4, 5]))), avro_schema, Value::Array(vec![Value::Int(4), Value::Int(5)]), ); test_err( &DataType::List(DataType::Int32.into()), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(4)), - None, - ]))) - .to_datum_ref(), + Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(), avro_schema, "encode error: found null but required", ); test_ok( &DataType::List(DataType::Int32.into()), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(4)), - None, - ]))), + Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))), r#"{ "type": "array", "items": ["null", "int"] @@ -802,15 +792,9 @@ mod tests { test_ok( &DataType::List(DataType::List(DataType::Int32.into()).into()), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(26)), - Some(ScalarImpl::Int32(29)), - ]))), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(46)), - Some(ScalarImpl::Int32(49)), - ]))), + Some(ScalarImpl::List(ListValue::from_iter([ + ListValue::from_iter([26, 29]), + ListValue::from_iter([46, 49]), ]))), r#"{ "type": "array", diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index 489023f80e145..a4df7b7665ef1 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -363,11 +363,7 @@ mod tests { Some(ScalarImpl::Int32(1)), Some(ScalarImpl::Utf8("".into())), ]))), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(4)), - Some(ScalarImpl::Int32(0)), - Some(ScalarImpl::Int32(4)), - ]))), + Some(ScalarImpl::List(ListValue::from_iter([4, 0, 4]))), Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))), ]); @@ -420,11 +416,11 @@ mod tests { DataType::List(DataType::Int32.into()), "repeated_int_field", )]); - let row = OwnedRow::new(vec![Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Int32(0)), + let row = OwnedRow::new(vec![Some(ScalarImpl::List(ListValue::from_iter([ + Some(0), None, - Some(ScalarImpl::Int32(2)), - Some(ScalarImpl::Int32(3)), + Some(2), + Some(3), ])))]); let err = encode_fields( diff --git a/src/expr/core/src/expr/expr_array_transform.rs b/src/expr/core/src/expr/expr_array_transform.rs index 016fad81074dd..19b678db87559 100644 --- a/src/expr/core/src/expr/expr_array_transform.rs +++ b/src/expr/core/src/expr/expr_array_transform.rs @@ -51,13 +51,10 @@ impl Expression for ArrayTransformExpression { let lambda_input = self.array.eval_row(input).await?; let lambda_input = lambda_input.map(ScalarImpl::into_list); if let Some(lambda_input) = lambda_input { - let mut new_vals = Vec::with_capacity(lambda_input.values().len()); - for val in lambda_input.values() { - let row = OwnedRow::new(vec![val.clone()]); - let res = self.lambda.eval_row(&row).await?; - new_vals.push(res); - } - let new_list = ListValue::new(new_vals); + let len = lambda_input.len(); + let chunk = DataChunk::new(vec![Arc::new(lambda_input.into_array())], len); + let new_vals = self.lambda.eval(&chunk).await?; + let new_list = ListValue::new(Arc::unwrap_or_clone(new_vals)); Ok(Some(new_list.into())) } else { Ok(None) diff --git a/src/expr/core/src/expr/expr_some_all.rs b/src/expr/core/src/expr/expr_some_all.rs index 8978824bef0bc..2cc84a0be79fd 100644 --- a/src/expr/core/src/expr/expr_some_all.rs +++ b/src/expr/core/src/expr/expr_some_all.rs @@ -14,10 +14,9 @@ use std::sync::Arc; -use itertools::Itertools; use risingwave_common::array::{Array, ArrayRef, BoolArray, DataChunk}; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{DataType, Datum, ListRef, Scalar, ScalarImpl, ScalarRefImpl}; +use risingwave_common::types::{DataType, Datum, Scalar, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::{bail, ensure}; use risingwave_pb::expr::expr_node::{RexNode, Type}; @@ -50,22 +49,35 @@ impl SomeAllExpression { } } - fn resolve_boolean_vec(&self, boolean_vec: Vec>) -> Option { + fn resolve_bools(&self, bools: impl Iterator>) -> Option { match self.expr_type { Type::Some => { - if boolean_vec.iter().any(|b| b.unwrap_or(false)) { - Some(true) - } else if boolean_vec.iter().any(|b| b.is_none()) { + let mut any_none = false; + for b in bools { + match b { + Some(true) => return Some(true), + Some(false) => continue, + None => any_none = true, + } + } + if any_none { None } else { Some(false) } } Type::All => { - if boolean_vec.iter().all(|b| b.unwrap_or(false)) { + let mut all_true = true; + for b in bools { + if b == Some(false) { + return Some(false); + } + if b != Some(true) { + all_true = false; + } + } + if all_true { Some(true) - } else if boolean_vec.iter().any(|b| !b.unwrap_or(true)) { - Some(false) } else { None } @@ -90,11 +102,7 @@ impl Expression for SomeAllExpression { let DataType::List(datatype) = arr_right_inner.data_type() else { unreachable!() }; - let capacity = arr_right_inner - .iter() - .flatten() - .map(ListRef::flatten_len) - .sum(); + let capacity = arr_right_inner.flatten().len(); let mut unfolded_arr_left_builder = arr_left.create_builder(capacity); let mut unfolded_arr_right_builder = datatype.create_array_builder(capacity); @@ -108,18 +116,13 @@ impl Expression for SomeAllExpression { return; } - let datum_right = right.unwrap(); - match datum_right { - ScalarRefImpl::List(array) => { - let flattened = array.flatten(); - let len = flattened.len(); - num_array.push(Some(len)); - unfolded_arr_left_builder.append_n(len, left); - for item in flattened { - unfolded_arr_right_builder.append(item); - } - } - _ => unreachable!(), + let array = right.unwrap().into_list(); + let flattened = array.flatten(); + let len = flattened.len(); + num_array.push(Some(len)); + unfolded_arr_left_builder.append_n(len, left); + for item in flattened.iter() { + unfolded_arr_right_builder.append(item); } }; @@ -162,9 +165,7 @@ impl Expression for SomeAllExpression { num_array .into_iter() .map(|num| match num { - Some(num) => { - self.resolve_boolean_vec(func_results_iter.by_ref().take(num).collect_vec()) - } + Some(num) => self.resolve_bools(func_results_iter.by_ref().take(num)), None => None, }) .collect::() @@ -175,30 +176,25 @@ impl Expression for SomeAllExpression { async fn eval_row(&self, row: &OwnedRow) -> Result { let datum_left = self.left_expr.eval_row(row).await?; let datum_right = self.right_expr.eval_row(row).await?; - if let Some(array) = datum_right { - match array { - ScalarImpl::List(array) => { - let mut scalar_vec = Vec::with_capacity(array.values().len()); - for d in array.values() { - let e = self - .func - .eval_row(&OwnedRow::new(vec![datum_left.clone(), d.clone()])) - .await?; - scalar_vec.push(e); - } - let boolean_vec = scalar_vec - .into_iter() - .map(|scalar_ref| scalar_ref.map(|s| s.into_bool())) - .collect_vec(); - Ok(self - .resolve_boolean_vec(boolean_vec) - .map(|b| b.to_scalar_value())) - } - _ => unreachable!(), - } - } else { - Ok(None) - } + let Some(array_right) = datum_right else { + return Ok(None); + }; + let array_right = array_right.into_list().into_array(); + let len = array_right.len(); + + // expand left to array + let array_left = { + let mut builder = self.left_expr.return_type().create_array_builder(len); + builder.append_n(len, datum_left); + builder.finish().into_ref() + }; + + let chunk = DataChunk::new(vec![array_left, Arc::new(array_right)], len); + let bools = self.func.eval(&chunk).await?; + + Ok(self + .resolve_bools(bools.as_bool().iter()) + .map(|b| b.to_scalar_value())) } } diff --git a/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs b/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs index d5b9d715b7317..18f56b67edf86 100644 --- a/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs +++ b/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs @@ -171,28 +171,24 @@ impl AggregateFunction for AppendOnlyApproxCountDistinct { result[start_idx + 1] |= (bucket_val.0 as u64) >> (i64::BITS - begin_bit as u32); } } - Ok(Some(ScalarImpl::List(ListValue::new( - result - .into_iter() - .map(|x| Some(ScalarImpl::Int64(x as i64))) - .collect(), + Ok(Some(ScalarImpl::List(ListValue::from_iter( + result.into_iter().map(|v| v as i64), )))) } fn decode_state(&self, datum: Datum) -> Result { let scalar = datum.unwrap(); - let list = scalar.as_list().values(); + let list = scalar.as_list(); let bucket_num = list.len() * i64::BITS as usize / LOG_COUNT_BITS as usize; let registers = (0..bucket_num) .map(|i| { let (start_idx, begin_bit, post_end_bit) = pos_in_serialized(i); - let val = *list[start_idx].as_ref().unwrap().as_int64(); + let val = list.get(start_idx).unwrap().unwrap().into_int64() as u64; let v = if post_end_bit <= i64::BITS { - (val as u64) << (i64::BITS - post_end_bit) - >> (i64::BITS - LOG_COUNT_BITS as u32) + val << (i64::BITS - post_end_bit) >> (i64::BITS - LOG_COUNT_BITS as u32) } else { - ((val as u64) >> begin_bit) - + (((*list[start_idx + 1].as_ref().unwrap().as_int64() as u64) + (val >> begin_bit) + + (((list.get(start_idx + 1).unwrap().unwrap().into_int64() as u64) & ((1 << (post_end_bit - i64::BITS)) - 1)) << (i64::BITS - begin_bit as u32)) }; diff --git a/src/expr/impl/src/aggregate/array_agg.rs b/src/expr/impl/src/aggregate/array_agg.rs index 963d56ed08621..e50b08a6e7725 100644 --- a/src/expr/impl/src/aggregate/array_agg.rs +++ b/src/expr/impl/src/aggregate/array_agg.rs @@ -12,15 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::ListValue; -use risingwave_common::types::{Datum, ScalarRefImpl, ToOwnedDatum}; +use risingwave_common::array::ArrayBuilderImpl; +use risingwave_common::estimate_size::EstimateSize; +use risingwave_common::types::{Datum, ListValue, ScalarRefImpl}; use risingwave_expr::aggregate; +use risingwave_expr::aggregate::AggStateDyn; +use risingwave_expr::expr::Context; #[aggregate("array_agg(any) -> anyarray")] -fn array_agg(state: Option, value: Option>) -> ListValue { - let mut state: Vec = state.unwrap_or_default().into(); - state.push(value.to_owned_datum()); - state.into() +fn array_agg(state: &mut ArrayAggState, value: Option>, ctx: &Context) { + state + .0 + .get_or_insert_with(|| ctx.arg_types[0].create_array_builder(1)) + .append(value); +} + +#[derive(Debug, Clone, Default)] +struct ArrayAggState(Option); + +impl EstimateSize for ArrayAggState { + fn estimated_heap_size(&self) -> usize { + self.0.estimated_heap_size() + } +} + +impl AggStateDyn for ArrayAggState {} + +/// Finishes aggregation and returns the result. +impl From<&ArrayAggState> for Datum { + fn from(state: &ArrayAggState) -> Self { + state + .0 + .as_ref() + .map(|b| ListValue::new(b.clone().finish()).into()) + } } #[cfg(test)] @@ -42,10 +67,7 @@ mod tests { let mut state = array_agg.create_state(); array_agg.update(&mut state, &chunk).await?; let actual = array_agg.get_result(&state).await?; - assert_eq!( - actual, - Some(ListValue::new(vec![Some(123.into()), Some(456.into()), Some(789.into())]).into()) - ); + assert_eq!(actual, Some(ListValue::from_iter([123, 456, 789]).into())); Ok(()) } @@ -63,7 +85,7 @@ mod tests { array_agg.update(&mut state, &chunk).await?; assert_eq!( array_agg.get_result(&state).await?, - Some(ListValue::new(vec![None]).into()) + Some(ListValue::from_iter([None::]).into()) ); Ok(()) } diff --git a/src/expr/impl/src/aggregate/bit_and.rs b/src/expr/impl/src/aggregate/bit_and.rs index 879f81704b14a..f803d5679c99d 100644 --- a/src/expr/impl/src/aggregate/bit_and.rs +++ b/src/expr/impl/src/aggregate/bit_and.rs @@ -15,7 +15,8 @@ use std::marker::PhantomData; use std::ops::BitAnd; -use risingwave_common::types::{ListRef, ListValue, ScalarImpl}; +use risingwave_common::array::I64Array; +use risingwave_common::types::{ListRef, ListValue}; use risingwave_expr::aggregate; /// Computes the bitwise AND of all non-null input values. @@ -109,15 +110,13 @@ impl BitAndUpdatable { // state is the number of 0s for each bit. fn create_state(&self) -> ListValue { - ListValue::new(vec![Some(ScalarImpl::Int64(0)); T::BITS]) + ListValue::new(I64Array::from_iter(std::iter::repeat(0).take(T::BITS)).into()) } fn accumulate(&self, mut state: ListValue, input: T) -> ListValue { - for i in 0..T::BITS { + let counts = state.as_i64_mut_slice().expect("invalid state"); + for (i, count) in counts.iter_mut().enumerate() { if !input.get_bit(i) { - let Some(ScalarImpl::Int64(count)) = &mut state[i] else { - panic!("invalid state"); - }; *count += 1; } } @@ -125,11 +124,9 @@ impl BitAndUpdatable { } fn retract(&self, mut state: ListValue, input: T) -> ListValue { - for i in 0..T::BITS { + let counts = state.as_i64_mut_slice().expect("invalid state"); + for (i, count) in counts.iter_mut().enumerate() { if !input.get_bit(i) { - let Some(ScalarImpl::Int64(count)) = &mut state[i] else { - panic!("invalid state"); - }; *count -= 1; } } @@ -137,10 +134,10 @@ impl BitAndUpdatable { } fn finalize(&self, state: ListRef<'_>) -> T { + let counts = state.as_i64_slice().expect("invalid state"); let mut result = T::default(); - for i in 0..T::BITS { - let count = state.get(i).unwrap().unwrap().into_int64(); - if count == 0 { + for (i, count) in counts.iter().enumerate() { + if *count == 0 { result.set_bit(i); } } diff --git a/src/expr/impl/src/aggregate/bit_or.rs b/src/expr/impl/src/aggregate/bit_or.rs index 1bf205f335e8b..d610e03e095f6 100644 --- a/src/expr/impl/src/aggregate/bit_or.rs +++ b/src/expr/impl/src/aggregate/bit_or.rs @@ -15,7 +15,8 @@ use std::marker::PhantomData; use std::ops::BitOr; -use risingwave_common::types::{ListRef, ListValue, ScalarImpl}; +use risingwave_common::array::I64Array; +use risingwave_common::types::{ListRef, ListValue}; use risingwave_expr::aggregate; use super::bit_and::Bits; @@ -109,15 +110,13 @@ impl BitOrUpdatable { // state is the number of 1s for each bit. fn create_state(&self) -> ListValue { - ListValue::new(vec![Some(ScalarImpl::Int64(0)); T::BITS]) + ListValue::new(I64Array::from_iter(std::iter::repeat(0).take(T::BITS)).into()) } fn accumulate(&self, mut state: ListValue, input: T) -> ListValue { - for i in 0..T::BITS { + let counts = state.as_i64_mut_slice().expect("invalid state"); + for (i, count) in counts.iter_mut().enumerate() { if input.get_bit(i) { - let Some(ScalarImpl::Int64(count)) = &mut state[i] else { - panic!("invalid state"); - }; *count += 1; } } @@ -125,11 +124,9 @@ impl BitOrUpdatable { } fn retract(&self, mut state: ListValue, input: T) -> ListValue { - for i in 0..T::BITS { + let counts = state.as_i64_mut_slice().expect("invalid state"); + for (i, count) in counts.iter_mut().enumerate() { if input.get_bit(i) { - let Some(ScalarImpl::Int64(count)) = &mut state[i] else { - panic!("invalid state"); - }; *count -= 1; } } @@ -137,10 +134,10 @@ impl BitOrUpdatable { } fn finalize(&self, state: ListRef<'_>) -> T { + let counts = state.as_i64_slice().expect("invalid state"); let mut result = T::default(); - for i in 0..T::BITS { - let count = state.get(i).unwrap().unwrap().into_int64(); - if count != 0 { + for (i, count) in counts.iter().enumerate() { + if *count != 0 { result.set_bit(i); } } diff --git a/src/expr/impl/src/aggregate/general.rs b/src/expr/impl/src/aggregate/general.rs index 993b567590263..ee7ec24cce87f 100644 --- a/src/expr/impl/src/aggregate/general.rs +++ b/src/expr/impl/src/aggregate/general.rs @@ -348,7 +348,7 @@ mod tests { test_agg( "(min:int4[] $0:int4[])", input, - Some(ListValue::new(vec![Some(0i32.into())]).into()), + Some(ListValue::from_iter([0]).into()), ); } diff --git a/src/expr/impl/src/aggregate/jsonb_agg.rs b/src/expr/impl/src/aggregate/jsonb_agg.rs index cbe2cfbd44587..f6b193c9e2715 100644 --- a/src/expr/impl/src/aggregate/jsonb_agg.rs +++ b/src/expr/impl/src/aggregate/jsonb_agg.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::estimate_size::EstimateSize; -use risingwave_common::types::{JsonbVal, ScalarImpl}; +use risingwave_common::types::{Datum, JsonbVal}; use risingwave_expr::aggregate::AggStateDyn; use risingwave_expr::expr::Context; use risingwave_expr::{aggregate, ExprError, Result}; @@ -70,13 +70,13 @@ impl Default for JsonbArrayState { } /// Finishes aggregation and returns the result. -impl From<&JsonbArrayState> for ScalarImpl { +impl From<&JsonbArrayState> for Datum { fn from(builder: &JsonbArrayState) -> Self { // TODO: avoid clone let mut builder = builder.0.clone(); builder.end_array(); let jsonb: JsonbVal = builder.finish().into(); - jsonb.into() + Some(jsonb.into()) } } @@ -101,12 +101,12 @@ impl Default for JsonbObjectState { } /// Finishes aggregation and returns the result. -impl From<&JsonbObjectState> for ScalarImpl { +impl From<&JsonbObjectState> for Datum { fn from(builder: &JsonbObjectState) -> Self { // TODO: avoid clone let mut builder = builder.0.clone(); builder.end_object(); let jsonb: JsonbVal = builder.finish().into(); - jsonb.into() + Some(jsonb.into()) } } diff --git a/src/expr/impl/src/scalar/array.rs b/src/expr/impl/src/scalar/array.rs index 7df75de8809f3..75cbfb1f288c6 100644 --- a/src/expr/impl/src/scalar/array.rs +++ b/src/expr/impl/src/scalar/array.rs @@ -15,11 +15,12 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::row::Row; use risingwave_common::types::ToOwnedDatum; +use risingwave_expr::expr::Context; use risingwave_expr::function; #[function("array(...) -> anyarray", type_infer = "panic")] -fn array(row: impl Row) -> ListValue { - ListValue::new(row.iter().map(|d| d.to_owned_datum()).collect()) +fn array(row: impl Row, ctx: &Context) -> ListValue { + ListValue::from_datum_iter(ctx.return_type.as_list(), row.iter()) } #[function("row(...) -> struct", type_infer = "panic")] diff --git a/src/expr/impl/src/scalar/array_access.rs b/src/expr/impl/src/scalar/array_access.rs index 2ac39be99ac8c..e9af2ac6663e5 100644 --- a/src/expr/impl/src/scalar/array_access.rs +++ b/src/expr/impl/src/scalar/array_access.rs @@ -30,18 +30,14 @@ fn array_access(list: ListRef<'_>, index: i32) -> Option> { mod tests { use risingwave_common::array::ListValue; - use risingwave_common::types::ScalarImpl; + use risingwave_common::types::Scalar; use super::*; #[test] fn test_int4_array_access() { - let v1 = ListValue::new(vec![ - Some(ScalarImpl::Int32(1)), - Some(ScalarImpl::Int32(2)), - Some(ScalarImpl::Int32(3)), - ]); - let l1 = ListRef::ValueRef { val: &v1 }; + let v1 = ListValue::from_iter([1, 2, 3]); + let l1 = v1.as_scalar_ref(); assert_eq!(array_access(l1, 1), Some(1.into())); assert_eq!(array_access(l1, -1), None); @@ -51,49 +47,24 @@ mod tests { #[test] fn test_utf8_array_access() { - let v1 = ListValue::new(vec![ - Some(ScalarImpl::Utf8("来自".into())), - Some(ScalarImpl::Utf8("foo".into())), - Some(ScalarImpl::Utf8("bar".into())), - ]); - let v2 = ListValue::new(vec![ - Some(ScalarImpl::Utf8("fizz".into())), - Some(ScalarImpl::Utf8("荷兰".into())), - Some(ScalarImpl::Utf8("buzz".into())), - ]); - let v3 = ListValue::new(vec![None, None, Some(ScalarImpl::Utf8("的爱".into()))]); - - let l1 = ListRef::ValueRef { val: &v1 }; - let l2 = ListRef::ValueRef { val: &v2 }; - let l3 = ListRef::ValueRef { val: &v3 }; + let v1 = ListValue::from_iter(["来自", "foo", "bar"]); + let v2 = ListValue::from_iter(["fizz", "荷兰", "buzz"]); + let v3 = ListValue::from_iter([None, None, Some("的爱")]); - assert_eq!(array_access(l1, 1), Some("来自".into())); - assert_eq!(array_access(l2, 2), Some("荷兰".into())); - assert_eq!(array_access(l3, 3), Some("的爱".into())); + assert_eq!(array_access(v1.as_scalar_ref(), 1), Some("来自".into())); + assert_eq!(array_access(v2.as_scalar_ref(), 2), Some("荷兰".into())); + assert_eq!(array_access(v3.as_scalar_ref(), 3), Some("的爱".into())); } #[test] fn test_nested_array_access() { - let v = ListValue::new(vec![ - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Utf8("foo".into())), - Some(ScalarImpl::Utf8("bar".into())), - ]))), - Some(ScalarImpl::List(ListValue::new(vec![ - Some(ScalarImpl::Utf8("fizz".into())), - Some(ScalarImpl::Utf8("buzz".into())), - ]))), + let v = ListValue::from_iter([ + ListValue::from_iter(["foo", "bar"]), + ListValue::from_iter(["fizz", "buzz"]), ]); - let l = ListRef::ValueRef { val: &v }; assert_eq!( - array_access(l, 1), - Some( - ListRef::from(&ListValue::new(vec![ - Some(ScalarImpl::Utf8("foo".into())), - Some(ScalarImpl::Utf8("bar".into())), - ])) - .into() - ) + array_access(v.as_scalar_ref(), 1), + Some(ListValue::from_iter(["foo", "bar"]).as_scalar_ref().into()) ); } } diff --git a/src/expr/impl/src/scalar/array_concat.rs b/src/expr/impl/src/scalar/array_concat.rs index ab63c6f1f7529..b7fd289ab100a 100644 --- a/src/expr/impl/src/scalar/array_concat.rs +++ b/src/expr/impl/src/scalar/array_concat.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::array::{ListRef, ListValue}; -use risingwave_common::types::{Datum, ScalarRef, ScalarRefImpl}; +use risingwave_common::types::{ScalarRef, ScalarRefImpl}; use risingwave_expr::expr::Context; use risingwave_expr::function; @@ -93,38 +93,37 @@ fn array_cat( right: Option>, ctx: &Context, ) -> Option { - let elems: Vec = if ctx.arg_types[0] == ctx.arg_types[1] { + Some(if ctx.arg_types[0] == ctx.arg_types[1] { // array || array let (Some(left), Some(right)) = (left, right) else { return left.or(right).map(|list| list.to_owned_scalar()); }; - left.iter() - .chain(right.iter()) - .map(|x| x.map(ScalarRefImpl::into_scalar_impl)) - .collect() + ListValue::from_datum_iter(ctx.arg_types[0].as_list(), left.iter().chain(right.iter())) } else if ctx.arg_types[0].as_list() == &ctx.arg_types[1] { // array[] || array let Some(right) = right else { return left.map(|left| left.to_owned_scalar()); }; - left.iter() - .flat_map(|list| list.iter()) - .chain([Some(right.into())]) - .map(|x| x.map(ScalarRefImpl::into_scalar_impl)) - .collect() + ListValue::from_datum_iter( + &ctx.arg_types[1], + left.iter() + .flat_map(|list| list.iter()) + .chain([Some(right.into())]), + ) } else if &ctx.arg_types[0] == ctx.arg_types[1].as_list() { // array || array[] let Some(left) = left else { return right.map(|right| right.to_owned_scalar()); }; - std::iter::once(Some(left.into())) - .chain(right.iter().flat_map(|list| list.iter())) - .map(|x| x.map(ScalarRefImpl::into_scalar_impl)) - .collect() + ListValue::from_datum_iter( + &ctx.arg_types[0], + [Some(left.into())] + .into_iter() + .chain(right.iter().flat_map(|list| list.iter())), + ) } else { unreachable!() - }; - Some(ListValue::new(elems)) + }) } /// Appends a value as the back element of an array. @@ -154,13 +153,16 @@ fn array_cat( /// {NULL} /// ``` #[function("array_append(anyarray, any) -> anyarray")] -fn array_append(left: Option>, right: Option>) -> ListValue { - ListValue::new( +fn array_append( + left: Option>, + right: Option>, + ctx: &Context, +) -> ListValue { + ListValue::from_datum_iter( + &ctx.arg_types[1], left.iter() .flat_map(|list| list.iter()) - .chain(std::iter::once(right)) - .map(|x| x.map(ScalarRefImpl::into_scalar_impl)) - .collect(), + .chain(std::iter::once(right)), ) } @@ -191,11 +193,13 @@ fn array_append(left: Option>, right: Option>) -> /// {NULL} /// ``` #[function("array_prepend(any, anyarray) -> anyarray")] -fn array_prepend(left: Option>, right: Option>) -> ListValue { - ListValue::new( - std::iter::once(left) - .chain(right.iter().flat_map(|list| list.iter())) - .map(|x| x.map(ScalarRefImpl::into_scalar_impl)) - .collect(), +fn array_prepend( + left: Option>, + right: Option>, + ctx: &Context, +) -> ListValue { + ListValue::from_datum_iter( + &ctx.arg_types[0], + std::iter::once(left).chain(right.iter().flat_map(|list| list.iter())), ) } diff --git a/src/expr/impl/src/scalar/array_contain.rs b/src/expr/impl/src/scalar/array_contain.rs index 5eb7f6a861d2d..526a2ad1c3658 100644 --- a/src/expr/impl/src/scalar/array_contain.rs +++ b/src/expr/impl/src/scalar/array_contain.rs @@ -63,27 +63,19 @@ fn array_contained(left: ListRef<'_>, right: ListRef<'_>) -> bool { #[cfg(test)] mod tests { - use risingwave_common::types::{ListValue, ScalarImpl}; + use risingwave_common::types::{ListValue, Scalar}; use super::*; #[test] fn test_contains() { assert!(array_contains( - ListRef::ValueRef { - val: &ListValue::new(vec![Some(ScalarImpl::Int32(2)), Some(ScalarImpl::Int32(3))]), - }, - ListRef::ValueRef { - val: &ListValue::new(vec![Some(ScalarImpl::Int32(2))]), - } + ListValue::from_iter([2, 3]).as_scalar_ref(), + ListValue::from_iter([2]).as_scalar_ref(), )); assert!(!array_contains( - ListRef::ValueRef { - val: &ListValue::new(vec![Some(ScalarImpl::Int32(2)), Some(ScalarImpl::Int32(3))]), - }, - ListRef::ValueRef { - val: &ListValue::new(vec![Some(ScalarImpl::Int32(5))]), - } + ListValue::from_iter([2, 3]).as_scalar_ref(), + ListValue::from_iter([5]).as_scalar_ref(), )); } } diff --git a/src/expr/impl/src/scalar/array_distinct.rs b/src/expr/impl/src/scalar/array_distinct.rs index a334506631bf8..184dcddbbd5bc 100644 --- a/src/expr/impl/src/scalar/array_distinct.rs +++ b/src/expr/impl/src/scalar/array_distinct.rs @@ -14,7 +14,6 @@ use itertools::Itertools; use risingwave_common::array::*; -use risingwave_common::types::ToOwnedDatum; use risingwave_expr::function; /// Returns a new array removing all the duplicates from the input array @@ -52,7 +51,7 @@ use risingwave_expr::function; #[function("array_distinct(anyarray) -> anyarray")] pub fn array_distinct(list: ListRef<'_>) -> ListValue { - ListValue::new(list.iter().unique().map(|x| x.to_owned_datum()).collect()) + ListValue::from_datum_iter(&list.data_type(), list.iter().unique()) } #[cfg(test)] @@ -63,8 +62,8 @@ mod tests { #[test] fn test_array_distinct_array_of_primitives() { - let array = ListValue::new([42, 43, 42].into_iter().map(|x| Some(x.into())).collect()); - let expected = ListValue::new([42, 43].into_iter().map(|x| Some(x.into())).collect()); + let array = ListValue::from_iter([42, 43, 42]); + let expected = ListValue::from_iter([42, 43]); let actual = array_distinct(array.as_scalar_ref()); assert_eq!(actual, expected); } diff --git a/src/expr/impl/src/scalar/array_positions.rs b/src/expr/impl/src/scalar/array_positions.rs index 6058730344485..d8b91d038dd7c 100644 --- a/src/expr/impl/src/scalar/array_positions.rs +++ b/src/expr/impl/src/scalar/array_positions.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::{ListRef, ListValue}; -use risingwave_common::types::{ScalarImpl, ScalarRefImpl}; +use risingwave_common::array::{I32Array, ListRef, ListValue}; +use risingwave_common::types::ScalarRefImpl; use risingwave_expr::{function, ExprError, Result}; /// Returns the subscript of the first occurrence of the second argument in the array, or `NULL` if @@ -197,7 +197,8 @@ fn array_positions( values .enumerate() .filter(|(_, item)| item == &element) - .map(|(idx, _)| Some(ScalarImpl::Int32((idx + 1) as _))) - .collect(), + .map(|(idx, _)| idx as i32 + 1) + .collect::() + .into(), ))) } diff --git a/src/expr/impl/src/scalar/array_range_access.rs b/src/expr/impl/src/scalar/array_range_access.rs index 2782a97a7147f..4a833e807ac73 100644 --- a/src/expr/impl/src/scalar/array_range_access.rs +++ b/src/expr/impl/src/scalar/array_range_access.rs @@ -13,23 +13,20 @@ // limitations under the License. use risingwave_common::array::{ListRef, ListValue}; -use risingwave_common::types::ToOwnedDatum; use risingwave_expr::function; /// If the case is `array[1,2,3][:2]`, then start will be 0 set by the frontend /// If the case is `array[1,2,3][1:]`, then end will be `i32::MAX` set by the frontend #[function("array_range_access(anyarray, int4, int4) -> anyarray")] pub fn array_range_access(list: ListRef<'_>, start: i32, end: i32) -> Option { - let mut data = vec![]; let list_all_values = list.iter(); let start = std::cmp::max(start, 1) as usize; let end = std::cmp::min(std::cmp::max(0, end), list_all_values.len() as i32) as usize; if start > end { - return Some(ListValue::new(data)); + return Some(ListValue::empty(&list.data_type())); } - - for datumref in list_all_values.take(end).skip(start - 1) { - data.push(datumref.to_owned_datum()); - } - Some(ListValue::new(data)) + Some(ListValue::from_datum_iter( + &list.data_type(), + list_all_values.take(end).skip(start - 1), + )) } diff --git a/src/expr/impl/src/scalar/array_remove.rs b/src/expr/impl/src/scalar/array_remove.rs index b26be50cc3954..0abe8c7fbb231 100644 --- a/src/expr/impl/src/scalar/array_remove.rs +++ b/src/expr/impl/src/scalar/array_remove.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::array::{ListRef, ListValue}; -use risingwave_common::types::{ScalarRefImpl, ToOwnedDatum}; +use risingwave_common::types::ScalarRefImpl; use risingwave_expr::function; /// Removes all elements equal to the given value from the array. @@ -68,11 +68,9 @@ use risingwave_expr::function; /// ``` #[function("array_remove(anyarray, any) -> anyarray")] fn array_remove(array: Option>, elem: Option>) -> Option { - Some(ListValue::new( - array? - .iter() - .filter(|x| x != &elem) - .map(|x| x.to_owned_datum()) - .collect(), + let array = array?; + Some(ListValue::from_datum_iter( + &array.data_type(), + array.iter().filter(|x| x != &elem), )) } diff --git a/src/expr/impl/src/scalar/array_replace.rs b/src/expr/impl/src/scalar/array_replace.rs index 9764a3937fc42..9b2a40ba1e00b 100644 --- a/src/expr/impl/src/scalar/array_replace.rs +++ b/src/expr/impl/src/scalar/array_replace.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::array::{ListRef, ListValue}; -use risingwave_common::types::{ScalarRefImpl, ToOwnedDatum}; +use risingwave_common::types::ScalarRefImpl; use risingwave_expr::function; /// Replaces each array element equal to the second argument with the third argument. @@ -60,13 +60,12 @@ fn array_replace( elem_from: Option>, elem_to: Option>, ) -> Option { - Some(ListValue::new( - array? - .iter() - .map(|x| match x == elem_from { - true => elem_to.to_owned_datum(), - false => x.to_owned_datum(), - }) - .collect(), + let array = array?; + Some(ListValue::from_datum_iter( + &array.data_type(), + array.iter().map(|val| match val == elem_from { + true => elem_to, + false => val, + }), )) } diff --git a/src/expr/impl/src/scalar/array_sort.rs b/src/expr/impl/src/scalar/array_sort.rs index c48fe7608076e..e40815282b3d1 100644 --- a/src/expr/impl/src/scalar/array_sort.rs +++ b/src/expr/impl/src/scalar/array_sort.rs @@ -12,16 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use risingwave_common::array::*; -use risingwave_common::types::{DatumRef, DefaultOrdered, ToOwnedDatum}; +use risingwave_common::types::DefaultOrdered; use risingwave_expr::function; #[function("array_sort(anyarray) -> anyarray")] -pub fn array_sort(list: ListRef<'_>) -> ListValue { - let mut v = list - .iter() - .map(DefaultOrdered) - .collect::>>>(); - v.sort(); - ListValue::new(v.into_iter().map(|x| x.0.to_owned_datum()).collect()) +pub fn array_sort(array: ListRef<'_>) -> ListValue { + ListValue::from_datum_iter( + &array.data_type(), + array.iter().map(DefaultOrdered).sorted().map(|v| v.0), + ) } diff --git a/src/expr/impl/src/scalar/array_to_string.rs b/src/expr/impl/src/scalar/array_to_string.rs index 0c98dcba262ea..884abffa22673 100644 --- a/src/expr/impl/src/scalar/array_to_string.rs +++ b/src/expr/impl/src/scalar/array_to_string.rs @@ -85,7 +85,7 @@ use risingwave_expr::function; fn array_to_string(array: ListRef<'_>, delimiter: &str, ctx: &Context, writer: &mut impl Write) { let element_data_type = ctx.arg_types[0].unnest_list(); let mut first = true; - for element in array.flatten() { + for element in array.flatten().iter() { let Some(element) = element else { continue }; if !first { write!(writer, "{}", delimiter).unwrap(); @@ -106,7 +106,7 @@ fn array_to_string_with_null( ) { let element_data_type = ctx.arg_types[0].unnest_list(); let mut first = true; - for element in array.flatten() { + for element in array.flatten().iter() { if !first { write!(writer, "{}", delimiter).unwrap(); } else { diff --git a/src/expr/impl/src/scalar/cast.rs b/src/expr/impl/src/scalar/cast.rs index 0f69e4cdad5e6..95a9e8055bc66 100644 --- a/src/expr/impl/src/scalar/cast.rs +++ b/src/expr/impl/src/scalar/cast.rs @@ -14,10 +14,11 @@ use std::fmt::Write; use std::str::FromStr; +use std::sync::Arc; use futures_util::FutureExt; use itertools::Itertools; -use risingwave_common::array::{ListRef, ListValue, StructRef, StructValue}; +use risingwave_common::array::{ArrayImpl, DataChunk, ListRef, ListValue, StructRef, StructValue}; use risingwave_common::cast; use risingwave_common::row::OwnedRow; use risingwave_common::types::{Int256, IntoOrdered, JsonbRef, ToText, F64}; @@ -203,16 +204,13 @@ fn list_cast(input: ListRef<'_>, ctx: &Context) -> Result { vec![InputRefExpression::new(ctx.arg_types[0].as_list().clone(), 0).boxed()], ) .unwrap(); - let elements = input.iter(); - let mut values = Vec::with_capacity(elements.len()); - for item in elements { - let v = cast - .eval_row(&OwnedRow::new(vec![item.map(|s| s.into_scalar_impl())])) // TODO: optimize - .now_or_never() - .unwrap()?; - values.push(v); - } - Ok(ListValue::new(values)) + let items = Arc::new(ArrayImpl::from(input.to_owned())); + let len = items.len(); + let list = cast + .eval(&DataChunk::new(vec![items], len)) + .now_or_never() + .unwrap()?; + Ok(ListValue::new(Arc::try_unwrap(list).unwrap())) } /// Cast struct of `source_elem_type` to `target_elem_type` by casting each element. @@ -305,13 +303,12 @@ mod tests { arg_types: vec![DataType::Varchar], return_type: DataType::from_str("int[]").unwrap(), }; - assert_eq!(str_to_list("{}", &ctx).unwrap(), ListValue::new(vec![])); + assert_eq!( + str_to_list("{}", &ctx).unwrap(), + ListValue::empty(&DataType::Varchar) + ); - let list123 = ListValue::new(vec![ - Some(1.to_scalar_value()), - Some(2.to_scalar_value()), - Some(3.to_scalar_value()), - ]); + let list123 = ListValue::from_iter([1, 2, 3]); // Single List let ctx = Context { @@ -321,23 +318,17 @@ mod tests { assert_eq!(str_to_list("{1, 2, 3}", &ctx).unwrap(), list123); // Nested List - let nested_list123 = ListValue::new(vec![Some(ScalarImpl::List(list123))]); + let nested_list123 = ListValue::from_iter([list123]); let ctx = Context { arg_types: vec![DataType::Varchar], return_type: DataType::from_str("int[][]").unwrap(), }; assert_eq!(str_to_list("{{1, 2, 3}}", &ctx).unwrap(), nested_list123); - let nested_list445566 = ListValue::new(vec![Some(ScalarImpl::List(ListValue::new(vec![ - Some(44.to_scalar_value()), - Some(55.to_scalar_value()), - Some(66.to_scalar_value()), - ])))]); + let nested_list445566 = ListValue::from_iter([ListValue::from_iter([44, 55, 66])]); - let double_nested_list123_445566 = ListValue::new(vec![ - Some(ScalarImpl::List(nested_list123.clone())), - Some(ScalarImpl::List(nested_list445566.clone())), - ]); + let double_nested_list123_445566 = + ListValue::from_iter([nested_list123.clone(), nested_list445566.clone()]); // Double nested List let ctx = Context { @@ -354,25 +345,9 @@ mod tests { arg_types: vec![DataType::from_str("int[][]").unwrap()], return_type: DataType::from_str("varchar[][]").unwrap(), }; - let double_nested_varchar_list123_445566 = ListValue::new(vec![ - Some(ScalarImpl::List( - list_cast( - ListRef::ValueRef { - val: &nested_list123, - }, - &ctx, - ) - .unwrap(), - )), - Some(ScalarImpl::List( - list_cast( - ListRef::ValueRef { - val: &nested_list445566, - }, - &ctx, - ) - .unwrap(), - )), + let double_nested_varchar_list123_445566 = ListValue::from_iter([ + list_cast(nested_list123.as_scalar_ref(), &ctx).unwrap(), + list_cast(nested_list445566.as_scalar_ref(), &ctx).unwrap(), ]); // Double nested Varchar List diff --git a/src/expr/impl/src/scalar/regexp.rs b/src/expr/impl/src/scalar/regexp.rs index 64234a8384dfe..09be16f8797bc 100644 --- a/src/expr/impl/src/scalar/regexp.rs +++ b/src/expr/impl/src/scalar/regexp.rs @@ -17,8 +17,7 @@ use std::str::FromStr; use fancy_regex::{Regex, RegexBuilder}; -use risingwave_common::array::ListValue; -use risingwave_common::types::ScalarImpl; +use risingwave_common::array::{ArrayBuilder, ListValue, Utf8Array, Utf8ArrayBuilder}; use risingwave_expr::{bail, function, ExprError, Result}; #[derive(Debug)] @@ -161,9 +160,9 @@ fn regexp_match(text: &str, regex: &RegexpContext) -> Option { let list = capture .iter() .skip(if skip_first { 1 } else { 0 }) - .map(|mat| mat.map(|m| m.as_str().into())) - .collect(); - Some(ListValue::new(list)) + .map(|mat| mat.map(|m| m.as_str())) + .collect::(); + Some(ListValue::new(list.into())) } #[function( @@ -462,7 +461,7 @@ fn regexp_replace( fn regexp_split_to_array(text: &str, regex: &RegexpContext) -> Option { let n = text.len(); let mut start = 0; - let mut list: Vec> = Vec::new(); + let mut builder = Utf8ArrayBuilder::new(0); let mut empty_flag = false; loop { @@ -492,7 +491,7 @@ fn regexp_split_to_array(text: &str, regex: &RegexpContext) -> Option start = begin; break; } - list.push(Some(text[start..begin + 1].into())); + builder.append(Some(&text[start..begin + 1])); start = end + 1; continue; } @@ -502,7 +501,7 @@ fn regexp_split_to_array(text: &str, regex: &RegexpContext) -> Option if !empty_flag { // We'll push an empty string to conform with postgres // If there does not exists a empty match before - list.push(Some("".to_string().into())); + builder.append(Some("")); } start = end; continue; @@ -510,7 +509,7 @@ fn regexp_split_to_array(text: &str, regex: &RegexpContext) -> Option if begin != 0 { // Normal case - list.push(Some(text[start..begin].into())); + builder.append(Some(&text[start..begin])); } // We should update the `start` no matter `begin` is zero or not @@ -521,12 +520,12 @@ fn regexp_split_to_array(text: &str, regex: &RegexpContext) -> Option // Push the extra text to the list // Note that this will implicitly push the entire text to the list // If there is no match, which is the expected behavior - list.push(Some(text[start..].into())); + builder.append(Some(&text[start..])); } if start == n && !empty_flag { - list.push(Some("".to_string().into())); + builder.append(Some("")); } - Some(ListValue::new(list)) + Some(ListValue::new(builder.finish().into())) } diff --git a/src/expr/impl/src/scalar/string_to_array.rs b/src/expr/impl/src/scalar/string_to_array.rs index a61cbda3ddfbe..6a7ad742e0e72 100644 --- a/src/expr/impl/src/scalar/string_to_array.rs +++ b/src/expr/impl/src/scalar/string_to_array.rs @@ -13,23 +13,21 @@ // limitations under the License. use auto_enums::auto_enum; -use itertools::Itertools; -use risingwave_common::array::ListValue; -use risingwave_common::types::ScalarImpl; +use risingwave_common::array::{ListValue, Utf8Array}; use risingwave_expr::function; #[auto_enum(Iterator)] -fn string_to_array_inner<'a>( - s: &'a str, - sep: Option<&'a str>, -) -> impl Iterator + 'a { +fn string_to_array_inner<'a>(s: &'a str, sep: Option<&'a str>) -> impl Iterator { match s.is_empty() { true => std::iter::empty(), #[nested] _ => match sep { - Some(sep) if sep.is_empty() => std::iter::once(s.to_string()), - Some(sep) => s.split(sep).map(|x| x.to_string()), - None => s.chars().map(|x| x.to_string()), + Some(sep) if sep.is_empty() => std::iter::once(s), + Some(sep) => s.split(sep), + None => s.char_indices().map(move |(index, ch)| { + let len = ch.len_utf8(); + &s[index..index + len] + }), }, } } @@ -38,9 +36,7 @@ fn string_to_array_inner<'a>( #[function("string_to_array(varchar, varchar) -> varchar[]")] pub fn string_to_array2(s: Option<&str>, sep: Option<&str>) -> Option { Some(ListValue::new( - string_to_array_inner(s?, sep) - .map(|x| Some(ScalarImpl::Utf8(x.into()))) - .collect_vec(), + string_to_array_inner(s?, sep).collect::().into(), )) } @@ -55,13 +51,8 @@ pub fn string_to_array3( }; Some(ListValue::new( string_to_array_inner(s?, sep) - .map(|x| { - if x == null { - None - } else { - Some(ScalarImpl::Utf8(x.into())) - } - }) - .collect_vec(), + .map(|x| if x == null { None } else { Some(x) }) + .collect::() + .into(), )) } diff --git a/src/expr/impl/src/scalar/trim_array.rs b/src/expr/impl/src/scalar/trim_array.rs index 3a9bbed9c0562..efaccbbb9e24c 100644 --- a/src/expr/impl/src/scalar/trim_array.rs +++ b/src/expr/impl/src/scalar/trim_array.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_common::array::{ListRef, ListValue}; -use risingwave_common::types::ToOwnedDatum; use risingwave_expr::{function, ExprError, Result}; /// Trims an array by removing the last n elements. If the array is multidimensional, only the first @@ -85,10 +84,8 @@ fn trim_array(array: ListRef<'_>, n: i32) -> Result { name: "n", reason: "more than array length".into(), })?; - Ok(ListValue::new( - values - .take(len_to_retain) - .map(|x| x.to_owned_datum()) - .collect(), + Ok(ListValue::from_datum_iter( + &array.data_type(), + values.take(len_to_retain), )) } diff --git a/src/expr/impl/src/table_function/regexp_matches.rs b/src/expr/impl/src/table_function/regexp_matches.rs index a5c3b0f3d4812..ff1a7a9b5ed04 100644 --- a/src/expr/impl/src/table_function/regexp_matches.rs +++ b/src/expr/impl/src/table_function/regexp_matches.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::ListValue; +use risingwave_common::array::{ListValue, Utf8Array}; use risingwave_expr::function; use crate::scalar::regexp::RegexpContext; @@ -37,8 +37,8 @@ fn regexp_matches<'a>( .unwrap() .iter() .skip(if skip_flag { 1 } else { 0 }) - .map(|mat| mat.map(|m| m.as_str().into())) - .collect(); - ListValue::new(list) + .map(|mat| mat.map(|m| m.as_str())) + .collect::(); + ListValue::new(list.into()) }) } diff --git a/src/expr/impl/src/table_function/unnest.rs b/src/expr/impl/src/table_function/unnest.rs index 7534b903565dd..bc170a5b8cc7b 100644 --- a/src/expr/impl/src/table_function/unnest.rs +++ b/src/expr/impl/src/table_function/unnest.rs @@ -21,5 +21,5 @@ use risingwave_expr::function; type_infer = "|args| Ok(args[0].unnest_list().clone())" )] fn unnest(list: ListRef<'_>) -> impl Iterator>> { - list.flatten().into_iter() + list.flatten().iter() } diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 718c4a0b72e79..ba01bf05d6f27 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -756,7 +756,7 @@ impl FunctionAttr { quote! { state = #next_state; } }; let get_result = if custom_state.is_some() { - quote! { Ok(Some(state.downcast_ref::<#state_type>().into())) } + quote! { Ok(state.downcast_ref::<#state_type>().into()) } } else if let AggregateFnOrImpl::Impl(impl_) = user_fn && impl_.finalize.is_some() { diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 4a08945b4d4eb..fc8f8bf4d1a73 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -966,12 +966,12 @@ impl Binder { .get_schema_by_name(&binder.db_name, schema_name) .is_ok() { - schema_names.push(Some(schema_name.into())); + schema_names.push(schema_name.as_str()); } } Ok(ExprImpl::literal_list( - ListValue::new(schema_names), + ListValue::from_iter(schema_names), DataType::Varchar, )) })), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs index 6c20cc33ef2dd..dfaed4dd7983e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs @@ -73,26 +73,19 @@ impl SysCatalogReaderImpl { Some(ScalarImpl::Utf8( distribution.distribution_type().as_str_name().into(), )), - Some(ScalarImpl::List(ListValue::new( - distribution - .state_table_ids - .into_iter() - .map(|id| Some(ScalarImpl::Int32(id as i32))) - .collect_vec(), + Some(ScalarImpl::List(ListValue::from_iter( + distribution.state_table_ids.into_iter().map(|id| id as i32), ))), - Some(ScalarImpl::List(ListValue::new( + Some(ScalarImpl::List(ListValue::from_iter( distribution .upstream_fragment_ids .into_iter() - .map(|id| Some(ScalarImpl::Int32(id as i32))) - .collect_vec(), + .map(|id| id as i32), ))), - Some(ScalarImpl::List(ListValue::new( + Some(ScalarImpl::List(ListValue::from_iter( Self::extract_fragment_type_flag(distribution.fragment_type_mask) .into_iter() - .flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_")) - .map(|t| Some(ScalarImpl::Utf8(t.into()))) - .collect_vec(), + .flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_")), ))), Some(ScalarImpl::Int32(distribution.parallelism as i32)), ]) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs index 2602e0089a940..d290732975883 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs @@ -65,12 +65,8 @@ impl SysCatalogReaderImpl { Some(ScalarImpl::Int32(schema.id() as i32)), Some(ScalarImpl::Int32(function.owner as i32)), Some(ScalarImpl::Utf8(function.kind.to_string().into())), - Some(ScalarImpl::List(ListValue::new( - function - .arg_types - .iter() - .map(|t| Some(ScalarImpl::Int32(t.to_oid()))) - .collect_vec(), + Some(ScalarImpl::List(ListValue::from_iter( + function.arg_types.iter().map(|t| t.to_oid()), ))), Some(ScalarImpl::Int32(function.return_type.to_oid())), Some(ScalarImpl::Utf8(function.language.clone().into())), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs index 70a4b2f088317..ab9f7b3f2eb22 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs @@ -60,12 +60,11 @@ impl SysCatalogReaderImpl { Some(ScalarImpl::Int32(index.id.index_id as i32)), Some(ScalarImpl::Utf8(index.name.clone().into())), Some(ScalarImpl::Int32(index.primary_table.id().table_id as i32)), - Some(ScalarImpl::List(ListValue::new( + Some(ScalarImpl::List(ListValue::from_iter( index .original_columns .iter() - .map(|index| Some(ScalarImpl::Int16(index.get_id() as i16 + 1))) - .collect_vec(), + .map(|index| index.get_id() as i16 + 1), ))), Some(ScalarImpl::Int32(schema.id() as i32)), Some(ScalarImpl::Int32(index.index_table.owner as i32)), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index 537aabbf5f172..acce4f2d592b2 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -82,12 +82,8 @@ impl SysCatalogReaderImpl { .to_uppercase() .into(), )), - Some(ScalarImpl::List(ListValue::new( - source - .columns - .iter() - .map(|c| Some(ScalarImpl::Utf8(c.name().into()))) - .collect_vec(), + Some(ScalarImpl::List(ListValue::from_iter( + source.columns.iter().map(|c| c.name()), ))), source .info diff --git a/src/frontend/src/expr/literal.rs b/src/frontend/src/expr/literal.rs index 2882243f93170..5843be54b64f6 100644 --- a/src/frontend/src/expr/literal.rs +++ b/src/frontend/src/expr/literal.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::list_array::display_for_explain; use risingwave_common::types::{literal_type_match, DataType, Datum, ToText}; use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; use risingwave_pb::expr::expr_node::RexNode; @@ -60,7 +59,7 @@ impl std::fmt::Debug for Literal { "'{}'", v.as_scalar_ref_impl().to_text_with_type(&data_type) ), - DataType::List { .. } => write!(f, "{}", display_for_explain(v.as_list())), + DataType::List { .. } => write!(f, "{}", v.as_list().display_for_explain()), }, }?; write!(f, ":{:?}", data_type) @@ -174,11 +173,7 @@ mod tests { #[test] fn test_list_to_value_encoding() { - let value = ListValue::new(vec![ - Some(ScalarImpl::Utf8("1".into())), - Some(ScalarImpl::Utf8("2".into())), - Some(ScalarImpl::Utf8("".into())), - ]); + let value = ListValue::from_iter(["1", "2", ""]); let data = Some(ScalarImpl::List(value.clone())); let node = literal_to_value_encoding(&data); if let RexNode::Constant(prost) = node { diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 327ff909b4765..aeca502e8b6f5 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -294,7 +294,7 @@ mod tests { use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::row::OwnedRow; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::{DataType, ListValue}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_expr::aggregate::{build_append_only, AggCall}; @@ -1124,17 +1124,7 @@ mod tests { table.commit(epoch).await.unwrap(); let res = state.get_output(&table, group_key.as_ref(), &agg).await?; - match res { - Some(ScalarImpl::List(res)) => { - let res = res - .values() - .iter() - .map(|v| v.as_ref().map(ScalarImpl::as_int32).cloned()) - .collect_vec(); - assert_eq!(res, vec![Some(2), Some(1)]); - } - _ => panic!("unexpected output"), - } + assert_eq!(res.unwrap().as_list(), &ListValue::from_iter([2, 1])); } { @@ -1151,17 +1141,7 @@ mod tests { table.commit(epoch).await.unwrap(); let res = state.get_output(&table, group_key.as_ref(), &agg).await?; - match res { - Some(ScalarImpl::List(res)) => { - let res = res - .values() - .iter() - .map(|v| v.as_ref().map(ScalarImpl::as_int32).cloned()) - .collect_vec(); - assert_eq!(res, vec![Some(2), Some(2), Some(0), Some(1)]); - } - _ => panic!("unexpected output"), - } + assert_eq!(res.unwrap().as_list(), &ListValue::from_iter([2, 2, 0, 1])); } Ok(())