Skip to content

Commit

Permalink
feat: add map type to scalar/array/expr (#17690)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Aug 12, 2024
1 parent 3707a1e commit b2b5a33
Show file tree
Hide file tree
Showing 49 changed files with 977 additions and 71 deletions.
2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message DataType {
JSONB = 18;
SERIAL = 19;
INT256 = 20;
MAP = 21;
}
TypeName type_name = 1;
// Data length for char.
Expand Down Expand Up @@ -102,6 +103,7 @@ enum ArrayType {
JSONB = 16;
SERIAL = 17;
INT256 = 18;
MAP = 20;
}

message Array {
Expand Down
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ message ExprNode {
JSONB_TO_RECORD = 630;
JSONB_SET = 631;

// Map functions
MAP_FROM_ENTRIES = 700;

// Non-pure functions below (> 1000)
// ------------------------
// Internal functions
Expand Down
45 changes: 45 additions & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

use std::fmt::Write;

use arrow_array::cast::AsArray;
use arrow_array_iceberg::array;
use arrow_buffer::OffsetBuffer;
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;
Expand Down Expand Up @@ -113,6 +115,7 @@ pub trait ToArrow {
ArrayImpl::Serial(array) => self.serial_to_arrow(array),
ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
}?;
if arrow_array.data_type() != data_type {
arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
Expand Down Expand Up @@ -267,6 +270,33 @@ pub trait ToArrow {
)))
}

#[inline]
fn map_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &MapArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
let arrow_schema::DataType::Map(field, ordered) = data_type else {
return Err(ArrayError::to_arrow("Invalid map type"));
};
if *ordered {
return Err(ArrayError::to_arrow("Sorted map is not supported"));
}
let values = self
.struct_to_arrow(field.data_type(), array.as_struct())?
.as_struct()
.clone();
let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
Ok(Arc::new(arrow_array::MapArray::new(
field.clone(),
offsets,
values,
nulls,
*ordered,
)))
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
Expand Down Expand Up @@ -297,6 +327,7 @@ pub trait ToArrow {
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};
Ok(arrow_schema::Field::new(name, data_type, true))
}
Expand Down Expand Up @@ -413,6 +444,20 @@ pub trait ToArrow {
.try_collect::<_, _, ArrayError>()?,
))
}

#[inline]
fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let list_type = map_type.clone().into_list();
Ok(arrow_schema::DataType::Map(
Arc::new(arrow_schema::Field::new(
"entries",
self.list_type_to_arrow(&list_type)?,
true,
)),
sorted,
))
}
}

/// Defines how to convert Arrow arrays to RisingWave arrays.
Expand Down
20 changes: 15 additions & 5 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl ArrayBuilder for ListArrayBuilder {

#[cfg(test)]
fn new(capacity: usize) -> Self {
// TODO: deprecate this
Self::with_type(
capacity,
// Default datatype
Expand Down Expand Up @@ -249,6 +250,12 @@ impl ListArray {
array.values.is_empty(),
"Must have no buffer in a list array"
);
debug_assert!(
(array.array_type == PbArrayType::List as i32)
|| (array.array_type == PbArrayType::Map as i32),
"invalid array type for list: {}",
array.array_type
);
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let array_data = array.get_list_array_data()?.to_owned();
let flatten_len = match array_data.offsets.last() {
Expand Down Expand Up @@ -406,15 +413,15 @@ impl ListValue {
}

pub fn memcmp_deserialize(
datatype: &DataType,
item_datatype: &DataType,
deserializer: &mut memcomparable::Deserializer<impl Buf>,
) -> memcomparable::Result<Self> {
let bytes = serde_bytes::ByteBuf::deserialize(deserializer)?;
let mut inner_deserializer = memcomparable::Deserializer::new(bytes.as_slice());
let mut builder = datatype.create_array_builder(0);
let mut builder = item_datatype.create_array_builder(0);
while inner_deserializer.has_remaining() {
builder.append(memcmp_encoding::deserialize_datum_in_composite(
datatype,
item_datatype,
&mut inner_deserializer,
)?)
}
Expand Down Expand Up @@ -500,6 +507,7 @@ impl From<ListValue> for ArrayImpl {
}
}

/// A slice of an array
#[derive(Copy, Clone)]
pub struct ListRef<'a> {
array: &'a ArrayImpl,
Expand Down Expand Up @@ -650,10 +658,12 @@ impl ToText for ListRef<'_> {
&& (s.is_empty()
|| s.to_ascii_lowercase() == "null"
|| s.contains([
'"', '\\', '{', '}', ',',
'"', '\\', ',',
// whilespace:
// PostgreSQL `array_isspace` includes '\x0B' but rust
// [`char::is_ascii_whitespace`] does not.
' ', '\t', '\n', '\r', '\x0B', '\x0C',
' ', '\t', '\n', '\r', '\x0B', '\x0C', // list-specific:
'{', '}',
]));
if need_quote {
f(&"\"")?;
Expand Down
Loading

0 comments on commit b2b5a33

Please sign in to comment.