Skip to content

Commit

Permalink
impl map as wrapper of list
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jul 24, 2024
1 parent cd30ac0 commit 3ad70d2
Show file tree
Hide file tree
Showing 48 changed files with 800 additions and 64 deletions.
2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message DataType {
JSONB = 18;
SERIAL = 19;
INT256 = 20;
MAP = 21;
}
TypeName type_name = 1;
// Data length for char.
Expand Down Expand Up @@ -102,6 +103,7 @@ enum ArrayType {
JSONB = 16;
SERIAL = 17;
INT256 = 18;
MAP = 20;
}

message Array {
Expand Down
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ message ExprNode {
JSONB_TO_RECORD = 630;
JSONB_SET = 631;

// Map functions
MAP_FROM_ENTRIES = 700;

// Non-pure functions below (> 1000)
// ------------------------
// Internal functions
Expand Down
45 changes: 45 additions & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

use std::fmt::Write;

use arrow_array::cast::AsArray;
use arrow_array_iceberg::array;
use arrow_buffer::OffsetBuffer;
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;
Expand Down Expand Up @@ -113,6 +115,7 @@ pub trait ToArrow {
ArrayImpl::Serial(array) => self.serial_to_arrow(array),
ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
}?;
if arrow_array.data_type() != data_type {
arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
Expand Down Expand Up @@ -267,6 +270,33 @@ pub trait ToArrow {
)))
}

#[inline]
fn map_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &MapArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
let arrow_schema::DataType::Map(field, ordered) = data_type else {
return Err(ArrayError::to_arrow("Invalid map type"));
};
if *ordered {
return Err(ArrayError::to_arrow("Sorted map is not supported"));
}
let values = self
.struct_to_arrow(field.data_type(), array.as_struct())?
.as_struct()
.clone();
let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
Ok(Arc::new(arrow_array::MapArray::new(
field.clone(),
offsets,
values,
nulls,
*ordered,
)))
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
Expand Down Expand Up @@ -297,6 +327,7 @@ pub trait ToArrow {
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};
Ok(arrow_schema::Field::new(name, data_type, true))
}
Expand Down Expand Up @@ -413,6 +444,20 @@ pub trait ToArrow {
.try_collect::<_, _, ArrayError>()?,
))
}

#[inline]
fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let list_type = map_type.clone().into_list();
Ok(arrow_schema::DataType::Map(
Arc::new(arrow_schema::Field::new(
"entries",
self.list_type_to_arrow(&list_type)?,
true,
)),
sorted,
))
}
}

/// Defines how to convert Arrow arrays to RisingWave arrays.
Expand Down
10 changes: 4 additions & 6 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::{
Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayResult, BoolArray, PrimitiveArray,
PrimitiveArrayItemType, RowRef, Utf8Array,
};
use crate::array::struct_array::{quote_if_need, PG_NEED_QUOTE_CHARS};
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::row::Row;
use crate::types::{
Expand Down Expand Up @@ -56,6 +57,7 @@ impl ArrayBuilder for ListArrayBuilder {

#[cfg(test)]
fn new(capacity: usize) -> Self {
// TODO: deprecate this
Self::with_type(
capacity,
// Default datatype
Expand Down Expand Up @@ -500,6 +502,7 @@ impl From<ListValue> for ArrayImpl {
}
}

/// A slice of an array
#[derive(Copy, Clone)]
pub struct ListRef<'a> {
array: &'a ArrayImpl,
Expand Down Expand Up @@ -649,12 +652,7 @@ impl ToText for ListRef<'_> {
let need_quote = !matches!(datum_ref, None | Some(ScalarRefImpl::List(_)))
&& (s.is_empty()
|| s.to_ascii_lowercase() == "null"
|| s.contains([
'"', '\\', '{', '}', ',',
// PostgreSQL `array_isspace` includes '\x0B' but rust
// [`char::is_ascii_whitespace`] does not.
' ', '\t', '\n', '\r', '\x0B', '\x0C',
]));
|| s.contains(PG_NEED_QUOTE_CHARS));
if need_quote {
f(&"\"")?;
s.chars().try_for_each(|c| {
Expand Down
Loading

0 comments on commit 3ad70d2

Please sign in to comment.