Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add map type to scalar/array/expr #17690

Merged
merged 11 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message DataType {
JSONB = 18;
SERIAL = 19;
INT256 = 20;
MAP = 21;
}
TypeName type_name = 1;
// Data length for char.
Expand Down Expand Up @@ -102,6 +103,7 @@ enum ArrayType {
JSONB = 16;
SERIAL = 17;
INT256 = 18;
MAP = 20;
}

message Array {
Expand Down
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ message ExprNode {
JSONB_TO_RECORD = 630;
JSONB_SET = 631;

// Map functions
MAP_FROM_ENTRIES = 700;

// Non-pure functions below (> 1000)
// ------------------------
// Internal functions
Expand Down
45 changes: 45 additions & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

use std::fmt::Write;

use arrow_array::cast::AsArray;
use arrow_array_iceberg::array;
use arrow_buffer::OffsetBuffer;
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;
Expand Down Expand Up @@ -113,6 +115,7 @@ pub trait ToArrow {
ArrayImpl::Serial(array) => self.serial_to_arrow(array),
ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
}?;
if arrow_array.data_type() != data_type {
arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
Expand Down Expand Up @@ -267,6 +270,33 @@ pub trait ToArrow {
)))
}

#[inline]
fn map_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &MapArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
let arrow_schema::DataType::Map(field, ordered) = data_type else {
return Err(ArrayError::to_arrow("Invalid map type"));
};
if *ordered {
return Err(ArrayError::to_arrow("Sorted map is not supported"));
}
let values = self
.struct_to_arrow(field.data_type(), array.as_struct())?
.as_struct()
.clone();
let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
Ok(Arc::new(arrow_array::MapArray::new(
field.clone(),
offsets,
values,
nulls,
*ordered,
)))
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
Expand Down Expand Up @@ -297,6 +327,7 @@ pub trait ToArrow {
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};
Ok(arrow_schema::Field::new(name, data_type, true))
}
Expand Down Expand Up @@ -413,6 +444,20 @@ pub trait ToArrow {
.try_collect::<_, _, ArrayError>()?,
))
}

#[inline]
fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let list_type = map_type.clone().into_list();
Ok(arrow_schema::DataType::Map(
Arc::new(arrow_schema::Field::new(
"entries",
self.list_type_to_arrow(&list_type)?,
true,
)),
sorted,
))
}
}

/// Defines how to convert Arrow arrays to RisingWave arrays.
Expand Down
20 changes: 15 additions & 5 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl ArrayBuilder for ListArrayBuilder {

#[cfg(test)]
fn new(capacity: usize) -> Self {
// TODO: deprecate this
Self::with_type(
capacity,
// Default datatype
Expand Down Expand Up @@ -249,6 +250,12 @@ impl ListArray {
array.values.is_empty(),
"Must have no buffer in a list array"
);
debug_assert!(
(array.array_type == PbArrayType::List as i32)
|| (array.array_type == PbArrayType::Map as i32),
"invalid array type for list: {}",
array.array_type
);
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let array_data = array.get_list_array_data()?.to_owned();
let flatten_len = match array_data.offsets.last() {
Expand Down Expand Up @@ -406,15 +413,15 @@ impl ListValue {
}

pub fn memcmp_deserialize(
datatype: &DataType,
item_datatype: &DataType,
deserializer: &mut memcomparable::Deserializer<impl Buf>,
) -> memcomparable::Result<Self> {
let bytes = serde_bytes::ByteBuf::deserialize(deserializer)?;
let mut inner_deserializer = memcomparable::Deserializer::new(bytes.as_slice());
let mut builder = datatype.create_array_builder(0);
let mut builder = item_datatype.create_array_builder(0);
while inner_deserializer.has_remaining() {
builder.append(memcmp_encoding::deserialize_datum_in_composite(
datatype,
item_datatype,
&mut inner_deserializer,
)?)
}
Expand Down Expand Up @@ -500,6 +507,7 @@ impl From<ListValue> for ArrayImpl {
}
}

/// A slice of an array
#[derive(Copy, Clone)]
pub struct ListRef<'a> {
array: &'a ArrayImpl,
Expand Down Expand Up @@ -650,10 +658,12 @@ impl ToText for ListRef<'_> {
&& (s.is_empty()
|| s.to_ascii_lowercase() == "null"
|| s.contains([
'"', '\\', '{', '}', ',',
'"', '\\', ',',
// whilespace:
// PostgreSQL `array_isspace` includes '\x0B' but rust
// [`char::is_ascii_whitespace`] does not.
' ', '\t', '\n', '\r', '\x0B', '\x0C',
' ', '\t', '\n', '\r', '\x0B', '\x0C', // list-specific:
xxchan marked this conversation as resolved.
Show resolved Hide resolved
'{', '}',
]));
if need_quote {
f(&"\"")?;
Expand Down
Loading
Loading