diff --git a/proto/data.proto b/proto/data.proto index 06d223d142bf9..9bb15ebcc8d62 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -52,6 +52,7 @@ message DataType { JSONB = 18; SERIAL = 19; INT256 = 20; + MAP = 21; } TypeName type_name = 1; // Data length for char. @@ -102,6 +103,7 @@ enum ArrayType { JSONB = 16; SERIAL = 17; INT256 = 18; + MAP = 20; } message Array { diff --git a/proto/expr.proto b/proto/expr.proto index 9887505bf61dc..1531984291028 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -283,6 +283,9 @@ message ExprNode { JSONB_TO_RECORD = 630; JSONB_SET = 631; + // Map functions + MAP_FROM_ENTRIES = 700; + // Non-pure functions below (> 1000) // ------------------------ // Internal functions diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index f4ca022ffd7fa..7d69b50afed49 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,6 +42,8 @@ use std::fmt::Write; +use arrow_array::cast::AsArray; +use arrow_array_iceberg::array; use arrow_buffer::OffsetBuffer; use chrono::{DateTime, NaiveDateTime, NaiveTime}; use itertools::Itertools; @@ -113,6 +115,7 @@ pub trait ToArrow { ArrayImpl::Serial(array) => self.serial_to_arrow(array), ArrayImpl::List(array) => self.list_to_arrow(data_type, array), ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array), + ArrayImpl::Map(array) => self.map_to_arrow(data_type, array), }?; if arrow_array.data_type() != data_type { arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow) @@ -267,6 +270,33 @@ pub trait ToArrow { ))) } + #[inline] + fn map_to_arrow( + &self, + data_type: &arrow_schema::DataType, + array: &MapArray, + ) -> Result { + let arrow_schema::DataType::Map(field, ordered) = data_type else { + return Err(ArrayError::to_arrow("Invalid map type")); + }; + if *ordered { + return Err(ArrayError::to_arrow("Sorted map is not supported")); + } + let values = self + .struct_to_arrow(field.data_type(), array.as_struct())? + .as_struct() + .clone(); + let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect()); + let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into()); + Ok(Arc::new(arrow_array::MapArray::new( + field.clone(), + offsets, + values, + nulls, + *ordered, + ))) + } + /// Convert RisingWave data type to Arrow data type. /// /// This function returns a `Field` instead of `DataType` because some may be converted to @@ -297,6 +327,7 @@ pub trait ToArrow { DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)), DataType::Struct(fields) => self.struct_type_to_arrow(fields)?, DataType::List(datatype) => self.list_type_to_arrow(datatype)?, + DataType::Map(datatype) => self.map_type_to_arrow(datatype)?, }; Ok(arrow_schema::Field::new(name, data_type, true)) } @@ -413,6 +444,20 @@ pub trait ToArrow { .try_collect::<_, _, ArrayError>()?, )) } + + #[inline] + fn map_type_to_arrow(&self, map_type: &MapType) -> Result { + let sorted = false; + let list_type = map_type.clone().into_list(); + Ok(arrow_schema::DataType::Map( + Arc::new(arrow_schema::Field::new( + "entries", + self.list_type_to_arrow(&list_type)?, + true, + )), + sorted, + )) + } } /// Defines how to convert Arrow arrays to RisingWave arrays. diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index 7fc1fdecee6fe..e7d4d780ea8f5 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -56,6 +56,7 @@ impl ArrayBuilder for ListArrayBuilder { #[cfg(test)] fn new(capacity: usize) -> Self { + // TODO: deprecate this Self::with_type( capacity, // Default datatype @@ -249,6 +250,12 @@ impl ListArray { array.values.is_empty(), "Must have no buffer in a list array" ); + debug_assert!( + (array.array_type == PbArrayType::List as i32) + || (array.array_type == PbArrayType::Map as i32), + "invalid array type for list: {}", + array.array_type + ); let bitmap: Bitmap = array.get_null_bitmap()?.into(); let array_data = array.get_list_array_data()?.to_owned(); let flatten_len = match array_data.offsets.last() { @@ -406,15 +413,15 @@ impl ListValue { } pub fn memcmp_deserialize( - datatype: &DataType, + item_datatype: &DataType, deserializer: &mut memcomparable::Deserializer, ) -> memcomparable::Result { let bytes = serde_bytes::ByteBuf::deserialize(deserializer)?; let mut inner_deserializer = memcomparable::Deserializer::new(bytes.as_slice()); - let mut builder = datatype.create_array_builder(0); + let mut builder = item_datatype.create_array_builder(0); while inner_deserializer.has_remaining() { builder.append(memcmp_encoding::deserialize_datum_in_composite( - datatype, + item_datatype, &mut inner_deserializer, )?) } @@ -500,6 +507,7 @@ impl From for ArrayImpl { } } +/// A slice of an array #[derive(Copy, Clone)] pub struct ListRef<'a> { array: &'a ArrayImpl, @@ -650,10 +658,12 @@ impl ToText for ListRef<'_> { && (s.is_empty() || s.to_ascii_lowercase() == "null" || s.contains([ - '"', '\\', '{', '}', ',', + '"', '\\', ',', + // whilespace: // PostgreSQL `array_isspace` includes '\x0B' but rust // [`char::is_ascii_whitespace`] does not. - ' ', '\t', '\n', '\r', '\x0B', '\x0C', + ' ', '\t', '\n', '\r', '\x0B', '\x0C', // list-specific: + '{', '}', ])); if need_quote { f(&"\"")?; diff --git a/src/common/src/array/map_array.rs b/src/common/src/array/map_array.rs new file mode 100644 index 0000000000000..e305da61f8a68 --- /dev/null +++ b/src/common/src/array/map_array.rs @@ -0,0 +1,444 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::fmt::{self, Debug, Display}; + +use bytes::{Buf, BufMut}; +use itertools::Itertools; +use risingwave_common_estimate_size::EstimateSize; +use risingwave_pb::data::{PbArray, PbArrayType}; +use serde::Serializer; + +use super::{ + Array, ArrayBuilder, ArrayImpl, ArrayResult, DatumRef, DefaultOrdered, ListArray, + ListArrayBuilder, ListRef, ListValue, MapType, ScalarRef, ScalarRefImpl, StructArray, +}; +use crate::bitmap::Bitmap; +use crate::types::{DataType, Scalar, ToText}; +use crate::util::memcmp_encoding; + +#[derive(Debug, Clone, EstimateSize)] +pub struct MapArrayBuilder { + inner: ListArrayBuilder, +} + +impl ArrayBuilder for MapArrayBuilder { + type ArrayType = MapArray; + + #[cfg(not(test))] + fn new(_capacity: usize) -> Self { + panic!("please use `MapArrayBuilder::with_type` instead"); + } + + #[cfg(test)] + fn new(capacity: usize) -> Self { + Self::with_type( + capacity, + DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Varchar)), + ) + } + + fn with_type(capacity: usize, ty: DataType) -> Self { + let inner = ListArrayBuilder::with_type(capacity, ty.into_map().into_list()); + Self { inner } + } + + fn append_n(&mut self, n: usize, value: Option>) { + self.inner.append_n(n, value.map(|v| v.into_inner())); + } + + fn append_array(&mut self, other: &MapArray) { + self.inner.append_array(&other.inner); + } + + fn pop(&mut self) -> Option<()> { + self.inner.pop() + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn finish(self) -> MapArray { + let inner = self.inner.finish(); + MapArray { inner } + } +} + +/// `MapArray` is physically just a `List>` array, but with some additional restrictions. +/// +/// Type: +/// - `key`'s datatype can only be string & integral types. (See [`MapType::check_key_type_valid`].) +/// - `value` can be any type. +/// +/// Value (for each map value in the array): +/// - `key`s are non-null and unique. +/// +/// - `key`s and `value`s must be of the same length. +/// For a `MapArray`, it's sliced by the `ListArray`'s offsets, so it essentially means the +/// `key` and `value` children arrays have the same length. +/// +/// - The lists are NOT sorted by `key`. +/// +/// - `Eq` / `Hash` / `Ord` for map: +/// +/// It's controversial due to the physicial representation is just an unordered list. +/// In many systems (e.g., `DuckDB` and `ClickHouse`), `{"k1":"v1","k2":"v2"} != {"k2":"v2","k1":"v1"}`. +/// But the reverse definition might be more intuitive, especially when ingesting Avro/Protobuf data. +/// +/// To avoid controversy, we wanted to ban all usages and make the implementation `unreachable!()`, +/// but it's hard since these implementations can be used in different places: +/// * Explicit in User-facing functions (e.g., comparison operators). These could be avoided completely. +/// * Implicit in Keys (group by / order by / primary key). These could also be banned, but it's harder. +/// * Some internal usages. One example is `_row_id`. See . +/// It might be solvable, but we are not sure whether it's depended somewhere else. +/// +/// Considering these, it might be better to still choose a _well-defined_ behavior instead +/// of using `unreachable`. We should try to have a consistent definition for these operations to minimize possible surprises. +/// And we could still try our best to ban it to prevent misuse. +/// +/// Currently we choose the second behavior. i.e., first sort the map by key, then compare/hash. +/// Note that `Eq` is intuitive, but `Ord` still looks strange. We assume no users really care about +/// which map is larger, but just provide a implementation to prevent undefined behavior. +/// +/// See more discussion in . +/// +/// +/// Note that decisions above are not definitive. Just be conservative at the beginning. +#[derive(Debug, Clone, Eq)] +pub struct MapArray { + pub(super) inner: ListArray, +} + +impl EstimateSize for MapArray { + fn estimated_heap_size(&self) -> usize { + self.inner.estimated_heap_size() + } +} + +impl Array for MapArray { + type Builder = MapArrayBuilder; + type OwnedItem = MapValue; + type RefItem<'a> = MapRef<'a>; + + unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { + let list = self.inner.raw_value_at_unchecked(idx); + MapRef::new_unchecked(list) + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn to_protobuf(&self) -> PbArray { + let mut array = self.inner.to_protobuf(); + array.array_type = PbArrayType::Map as i32; + array + } + + fn null_bitmap(&self) -> &Bitmap { + self.inner.null_bitmap() + } + + fn into_null_bitmap(self) -> Bitmap { + self.inner.into_null_bitmap() + } + + fn set_bitmap(&mut self, bitmap: Bitmap) { + self.inner.set_bitmap(bitmap) + } + + fn data_type(&self) -> DataType { + let list_value_type = self.inner.values().data_type(); + DataType::Map(MapType::from_list_entries(list_value_type)) + } +} + +impl MapArray { + pub fn from_protobuf(array: &PbArray) -> ArrayResult { + let inner = ListArray::from_protobuf(array)?.into_list(); + Ok(Self { inner }.into()) + } + + /// Return the inner struct array of the list array. + pub fn as_struct(&self) -> &StructArray { + self.inner.values().as_struct() + } + + /// Returns the offsets of this map. + pub fn offsets(&self) -> &[u32] { + self.inner.offsets() + } +} + +pub use scalar::{MapRef, MapValue}; + +/// We can enforce the invariants (see [`MapArray`]) in too many places +/// (both `MapValue`, `MapRef` and `MapArray`). +/// +/// So we define the types and constructors in a separated `mod` +/// to prevent direct construction. +/// We only check the invariants in the constructors. +/// After they are constructed, we assume the invariants holds. +mod scalar { + use super::*; + + /// Refer to [`MapArray`] for the invariants of a map value. + #[derive(Clone, Eq, EstimateSize)] + pub struct MapValue(ListValue); + + /// A map is just a slice of the underlying struct array. + /// + /// Refer to [`MapArray`] for the invariants of a map value. + /// + /// XXX: perhaps we can make it `MapRef<'a, 'b>(ListRef<'a>, ListRef<'b>);`. + /// Then we can build a map ref from 2 list refs without copying the data. + /// Currently it's impossible. + /// + #[derive(Copy, Clone, Eq)] + pub struct MapRef<'a>(ListRef<'a>); + + impl MapValue { + pub fn inner(&self) -> &ListValue { + &self.0 + } + + pub fn into_inner(self) -> ListValue { + self.0 + } + + /// # Panics + /// Panics if [map invariants](`super::MapArray`) are violated. + pub fn from_list_entries(list: ListValue) -> Self { + // validates list type is valid + _ = MapType::from_list_entries(list.data_type()); + // TODO: validate the values is valid + MapValue(list) + } + + /// # Panics + /// Panics if [map invariants](`super::MapArray`) are violated. + pub fn try_from_kv(key: ListValue, value: ListValue) -> Result { + if key.len() != value.len() { + return Err("map keys and values have different length".to_string()); + } + let unique_keys = key.iter().unique().collect_vec(); + if unique_keys.len() != key.len() { + return Err("map keys must be unique".to_string()); + } + if unique_keys.contains(&None) { + return Err("map keys must not be NULL".to_string()); + } + + let len = key.len(); + let key_type = key.data_type(); + let value_type = value.data_type(); + let struct_array = StructArray::new( + MapType::struct_type_for_map(key_type, value_type), + vec![key.into_array().into_ref(), value.into_array().into_ref()], + Bitmap::ones(len), + ); + Ok(MapValue(ListValue::new(struct_array.into()))) + } + } + + impl<'a> MapRef<'a> { + /// # Safety + /// The caller must ensure the invariants of a map value. + pub unsafe fn new_unchecked(list: ListRef<'a>) -> Self { + MapRef(list) + } + + pub fn inner(&self) -> &ListRef<'a> { + &self.0 + } + + pub fn into_inner(self) -> ListRef<'a> { + self.0 + } + } + + impl Scalar for MapValue { + type ScalarRefType<'a> = MapRef<'a>; + + fn as_scalar_ref(&self) -> MapRef<'_> { + // MapValue is assumed to be valid, so we just construct directly without check invariants. + MapRef(self.0.as_scalar_ref()) + } + } + + impl<'a> ScalarRef<'a> for MapRef<'a> { + type ScalarType = MapValue; + + fn to_owned_scalar(&self) -> MapValue { + // MapRef is assumed to be valid, so we just construct directly without check invariants. + MapValue(self.0.to_owned_scalar()) + } + + fn hash_scalar(&self, state: &mut H) { + for (k, v) in self.iter_sorted() { + super::super::hash_datum(Some(k), state); + super::super::hash_datum(v, state); + } + } + } +} + +/// Refer to [`MapArray`] for the semantics of the comparison. +mod cmp { + use super::*; + use crate::array::DefaultOrd; + impl PartialEq for MapArray { + fn eq(&self, other: &Self) -> bool { + self.iter().eq(other.iter()) + } + } + + impl PartialEq for MapValue { + fn eq(&self, other: &Self) -> bool { + self.as_scalar_ref().eq(&other.as_scalar_ref()) + } + } + + impl PartialOrd for MapValue { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for MapValue { + fn cmp(&self, other: &Self) -> Ordering { + self.as_scalar_ref().cmp(&other.as_scalar_ref()) + } + } + + impl PartialEq for MapRef<'_> { + fn eq(&self, other: &Self) -> bool { + self.iter_sorted().eq(other.iter_sorted()) + } + } + + impl Ord for MapRef<'_> { + fn cmp(&self, other: &Self) -> Ordering { + self.iter_sorted() + .cmp_by(other.iter_sorted(), |(k1, v1), (k2, v2)| { + k1.default_cmp(&k2).then_with(|| v1.default_cmp(&v2)) + }) + } + } + + impl PartialOrd for MapRef<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } +} + +impl Debug for MapValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.as_scalar_ref().fmt(f) + } +} + +impl Display for MapValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.as_scalar_ref().write(f) + } +} + +impl<'a> MapRef<'a> { + /// Iterates over the elements of the map. + pub fn iter( + self, + ) -> impl DoubleEndedIterator + ExactSizeIterator, DatumRef<'a>)> + 'a + { + self.inner().iter().map(|list_elem| { + let list_elem = list_elem.expect("the list element in map should not be null"); + let struct_ = list_elem.into_struct(); + let (k, v) = struct_ + .iter_fields_ref() + .next_tuple() + .expect("the struct in map should have exactly 2 fields"); + (k.expect("map key should not be null"), v) + }) + } + + pub fn iter_sorted( + self, + ) -> impl DoubleEndedIterator + ExactSizeIterator, DatumRef<'a>)> + 'a + { + self.iter().sorted_by_key(|(k, _v)| DefaultOrdered(*k)) + } + + /// Note: Map should not be used as key. But we don't want to panic. + /// See [`MapArray`] for the semantics. See also the `Ord` implementation. + /// TODO: ban it in fe + pub fn memcmp_serialize( + self, + serializer: &mut memcomparable::Serializer, + ) -> memcomparable::Result<()> { + let mut inner_serializer = memcomparable::Serializer::new(vec![]); + for (k, v) in self.iter_sorted() { + memcmp_encoding::serialize_datum_in_composite(Some(k), &mut inner_serializer)?; + memcmp_encoding::serialize_datum_in_composite(v, &mut inner_serializer)?; + } + serializer.serialize_bytes(&inner_serializer.into_inner()) + } +} + +impl MapValue { + /// Note: Map should not be used as key. But we don't want to panic. + /// See [`MapArray`] for the semantics. See also the `Ord` implementation. + /// TODO: ban it in fe + pub fn memcmp_deserialize( + datatype: &MapType, + deserializer: &mut memcomparable::Deserializer, + ) -> memcomparable::Result { + let list = ListValue::memcmp_deserialize( + &DataType::Struct(datatype.clone().into_struct()), + deserializer, + )?; + Ok(Self::from_list_entries(list)) + } +} + +impl Debug for MapRef<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.inner().iter()).finish() + } +} + +impl ToText for MapRef<'_> { + fn write(&self, f: &mut W) -> std::fmt::Result { + // Note: This is arbitrarily decided... + write!( + f, + "{{{}}}", + self.iter().format_with(",", |(key, value), f| { + let key = key.to_text(); + let value = value.to_text(); + // TODO: consider quote like list and struct + f(&format_args!("\"{}\":{}", key, value)) + }) + ) + } + + fn write_with_type(&self, ty: &DataType, f: &mut W) -> std::fmt::Result { + match ty { + DataType::Map { .. } => self.write(f), + _ => unreachable!(), + } + } +} diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 89b3b06266786..ae6f7d0fa144b 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -26,6 +26,7 @@ pub mod interval_array; mod iterator; mod jsonb_array; pub mod list_array; +mod map_array; mod num256_array; mod primitive_array; mod proto_reader; @@ -53,6 +54,7 @@ pub use interval_array::{IntervalArray, IntervalArrayBuilder}; pub use iterator::ArrayIterator; pub use jsonb_array::{JsonbArray, JsonbArrayBuilder}; pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue}; +pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue}; use paste::paste; pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType}; use risingwave_common_estimate_size::EstimateSize; @@ -104,6 +106,7 @@ pub trait ArrayBuilder: Send + Sync + Sized + 'static { type ArrayType: Array; /// Create a new builder with `capacity`. + /// TODO: remove this function from the trait. Let it be methods of each concrete builders. fn new(capacity: usize) -> Self; /// # Panics @@ -135,6 +138,8 @@ pub trait ArrayBuilder: Send + Sync + Sized + 'static { /// Pop an element from the builder. /// + /// It's used in `rollback` in source parser. + /// /// # Returns /// /// Returns `None` if there is no elements in the builder. @@ -331,6 +336,10 @@ macro_rules! array_impl_enum { for_all_array_variants! { array_impl_enum } +// We cannot put the From implementations in impl_convert, +// because then we can't prove for all `T: PrimitiveArrayItemType`, +// it's implemented. + impl From> for ArrayImpl { fn from(arr: PrimitiveArray) -> Self { T::erase_array_type(arr) @@ -379,6 +388,12 @@ impl From for ArrayImpl { } } +impl From for ArrayImpl { + fn from(arr: MapArray) -> Self { + Self::Map(arr) + } +} + /// `impl_convert` implements several conversions for `Array` and `ArrayBuilder`. /// * `ArrayImpl -> &Array` with `impl.as_int16()`. /// * `ArrayImpl -> Array` with `impl.into_int16()`. @@ -390,6 +405,9 @@ macro_rules! impl_convert { $( paste! { impl ArrayImpl { + /// # Panics + /// + /// Panics if type mismatches. pub fn [](&self) -> &$array { match self { Self::$variant_name(ref array) => array, @@ -397,6 +415,9 @@ macro_rules! impl_convert { } } + /// # Panics + /// + /// Panics if type mismatches. pub fn [](self) -> $array { match self { Self::$variant_name(array) => array, @@ -405,6 +426,7 @@ macro_rules! impl_convert { } } + // FIXME: panic in From here is not proper. impl <'a> From<&'a ArrayImpl> for &'a $array { fn from(array: &'a ArrayImpl) -> Self { match array { diff --git a/src/common/src/array/proto_reader.rs b/src/common/src/array/proto_reader.rs index 3238368ed1041..f5f61ab9ce893 100644 --- a/src/common/src/array/proto_reader.rs +++ b/src/common/src/array/proto_reader.rs @@ -43,6 +43,7 @@ impl ArrayImpl { PbArrayType::List => ListArray::from_protobuf(array)?, PbArrayType::Bytea => read_string_array::(array, cardinality)?, PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?, + PbArrayType::Map => MapArray::from_protobuf(array)?, }; Ok(array) } diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 22aae00c84f4c..9c3bd23653815 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -498,15 +498,25 @@ impl ToText for StructRef<'_> { } /// Double quote a string if it contains any special characters. -fn quote_if_need(input: &str, writer: &mut impl Write) -> std::fmt::Result { +pub fn quote_if_need(input: &str, writer: &mut impl Write) -> std::fmt::Result { + // Note: for struct here, 'null' as a string is not quoted, but for list it's quoted: + // ```sql + // select row('a','a b','null'), array['a','a b','null']; + // ---- + // (a,"a b",null) {a,"a b","null"} + // ``` if !input.is_empty() // non-empty - && !input.contains([ - '"', '\\', '(', ')', ',', - // PostgreSQL `array_isspace` includes '\x0B' but rust - // [`char::is_ascii_whitespace`] does not. - ' ', '\t', '\n', '\r', '\x0B', '\x0C', - ]) - { + && !input.contains( + [ + '"', '\\', ',', + // whilespace: + // PostgreSQL `array_isspace` includes '\x0B' but rust + // [`char::is_ascii_whitespace`] does not. + ' ', '\t', '\n', '\r', '\x0B', '\x0C', + // struct-specific: + '(',')' +] + ) { return writer.write_str(input); } diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 61a41e9a7365d..96928e69f4a83 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -33,7 +33,7 @@ use risingwave_common_estimate_size::EstimateSize; use smallbitset::Set64; use static_assertions::const_assert_eq; -use crate::array::{ListValue, StructValue}; +use crate::array::{ListValue, MapValue, StructValue}; use crate::types::{ DataType, Date, Decimal, Int256, Int256Ref, JsonbVal, Scalar, ScalarRef, ScalarRefImpl, Serial, Time, Timestamp, Timestamptz, F32, F64, @@ -627,6 +627,7 @@ impl_value_encoding_hash_key_serde!(JsonbVal); // use the memcmp encoding for safety. impl_memcmp_encoding_hash_key_serde!(StructValue); impl_memcmp_encoding_hash_key_serde!(ListValue); +impl_memcmp_encoding_hash_key_serde!(MapValue); #[cfg(test)] mod tests { diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 0e2e1d92e28c3..a7c13e3178f26 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -24,10 +24,10 @@ use rand::prelude::Distribution; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; -use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, StructValue}; +use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, MapValue, StructValue}; use crate::types::{ - Date, Decimal, Int256, Interval, JsonbVal, NativeType, Scalar, Serial, Time, Timestamp, - Timestamptz, + DataType, Date, Decimal, Int256, Interval, JsonbVal, MapType, NativeType, Scalar, Serial, Time, + Timestamp, Timestamptz, }; pub trait RandValue { @@ -151,6 +151,15 @@ impl RandValue for ListValue { } } +impl RandValue for MapValue { + fn rand_value(_rand: &mut R) -> Self { + // dummy value + MapValue::from_list_entries(ListValue::empty(&DataType::Struct( + MapType::struct_type_for_map(DataType::Varchar, DataType::Varchar), + ))) + } +} + pub fn rand_array(rand: &mut R, size: usize, null_ratio: f64) -> A where A: Array, diff --git a/src/common/src/test_utils/rand_chunk.rs b/src/common/src/test_utils/rand_chunk.rs index 3e537fd9b6a49..9c604b6205cc3 100644 --- a/src/common/src/test_utils/rand_chunk.rs +++ b/src/common/src/test_utils/rand_chunk.rs @@ -43,10 +43,11 @@ pub fn gen_chunk(data_types: &[DataType], size: usize, seed: u64, null_ratio: f6 } DataType::Interval => seed_rand_array_ref::(size, seed, null_ratio), DataType::Int256 => seed_rand_array_ref::(size, seed, null_ratio), - DataType::Struct(_) | DataType::Bytea | DataType::Jsonb => { - todo!() - } - DataType::List(_) => { + DataType::Struct(_) + | DataType::Bytea + | DataType::Jsonb + | DataType::List(_) + | DataType::Map(_) => { todo!() } }); diff --git a/src/common/src/types/macros.rs b/src/common/src/types/macros.rs index 520e4ab8f45ee..1dd29156dd651 100644 --- a/src/common/src/types/macros.rs +++ b/src/common/src/types/macros.rs @@ -58,6 +58,7 @@ macro_rules! for_all_variants { { Serial, Serial, serial, $crate::types::Serial, $crate::types::Serial, $crate::array::SerialArray, $crate::array::SerialArrayBuilder }, { Struct, Struct, struct, $crate::types::StructValue, $crate::types::StructRef<'scalar>, $crate::array::StructArray, $crate::array::StructArrayBuilder }, { List, List, list, $crate::types::ListValue, $crate::types::ListRef<'scalar>, $crate::array::ListArray, $crate::array::ListArrayBuilder }, + { Map, Map, map, $crate::types::MapValue, $crate::types::MapRef<'scalar>, $crate::array::MapArray, $crate::array::MapArrayBuilder }, { Bytea, Bytea, bytea, Box<[u8]>, &'scalar [u8], $crate::array::BytesArray, $crate::array::BytesArrayBuilder } } }; diff --git a/src/common/src/types/map_type.rs b/src/common/src/types/map_type.rs new file mode 100644 index 0000000000000..11600bb45b42d --- /dev/null +++ b/src/common/src/types/map_type.rs @@ -0,0 +1,142 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Formatter; + +use anyhow::Context; + +use super::*; + +/// Refer to [`super::super::array::MapArray`] for the invariants of a map value. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct MapType(Box<(DataType, DataType)>); + +impl From for DataType { + fn from(value: MapType) -> Self { + DataType::Map(value) + } +} + +impl MapType { + /// # Panics + /// Panics if the key type is not valid for a map. + pub fn from_kv(key: DataType, value: DataType) -> Self { + Self::check_key_type_valid(&key).unwrap(); + Self(Box::new((key, value))) + } + + pub fn try_from_kv(key: DataType, value: DataType) -> Result { + Self::check_key_type_valid(&key)?; + Ok(Self(Box::new((key, value)))) + } + + /// # Panics + /// Panics if the key type is not valid for a map, or the + /// entries type is not a valid struct type. + pub fn from_list_entries(list_entries_type: DataType) -> Self { + let struct_type = list_entries_type.as_struct(); + let (k, v) = struct_type + .iter() + .collect_tuple() + .expect("the underlying struct for map must have exactly two fields"); + // the field names are not strictly enforced + // Currently this panics for SELECT * FROM t + // if cfg!(debug_assertions) { + // itertools::assert_equal(struct_type.names(), ["key", "value"]); + // } + Self::from_kv(k.1.clone(), v.1.clone()) + } + + /// # Panics + /// Panics if the key type is not valid for a map. + pub fn struct_type_for_map(key_type: DataType, value_type: DataType) -> StructType { + MapType::check_key_type_valid(&key_type).unwrap(); + StructType::new(vec![("key", key_type), ("value", value_type)]) + } + + pub fn key(&self) -> &DataType { + &self.0 .0 + } + + pub fn value(&self) -> &DataType { + &self.0 .1 + } + + pub fn into_struct(self) -> StructType { + let (key, value) = *self.0; + Self::struct_type_for_map(key, value) + } + + pub fn into_list(self) -> DataType { + DataType::List(Box::new(DataType::Struct(self.into_struct()))) + } + + /// String and integral types are allowed. + /// + /// This is similar to [Protobuf](https://protobuf.dev/programming-guides/proto3/#maps)'s + /// decision. + /// + /// Note that this isn't definitive. + /// Just be conservative at the beginning, but not too restrictive (like only allowing strings). + pub fn check_key_type_valid(data_type: &DataType) -> anyhow::Result<()> { + let ok = match data_type { + DataType::Int16 | DataType::Int32 | DataType::Int64 => true, + DataType::Varchar => true, + DataType::Boolean + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Interval + | DataType::Struct(_) + | DataType::List(_) + | DataType::Bytea + | DataType::Jsonb + | DataType::Serial + | DataType::Int256 + | DataType::Map(_) => false, + }; + if !ok { + Err(anyhow::anyhow!("invalid map key type: {data_type}")) + } else { + Ok(()) + } + } +} + +impl FromStr for MapType { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + if !(s.starts_with("map(") && s.ends_with(')')) { + return Err(anyhow::anyhow!("expect map(...,...)")); + }; + if let Some((key, value)) = s[4..s.len() - 1].split(',').collect_tuple() { + let key = key.parse().context("failed to parse map key type")?; + let value = value.parse().context("failed to parse map value type")?; + MapType::try_from_kv(key, value) + } else { + Err(anyhow::anyhow!("expect map(...,...)")) + } + } +} + +impl std::fmt::Display for MapType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "map({},{})", self.key(), self.value()) + } +} diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index e53a4597301b0..f73221d9cbe40 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -37,7 +37,8 @@ use thiserror_ext::AsReport; use crate::array::{ ArrayBuilderImpl, ArrayError, ArrayResult, PrimitiveArrayItemType, NULL_VAL_FOR_HASH, }; -pub use crate::array::{ListRef, ListValue, StructRef, StructValue}; +// Complex type's value is based on the array +pub use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue}; use crate::cast::{str_to_bool, str_to_bytea}; use crate::error::BoxedError; use crate::{ @@ -53,6 +54,7 @@ mod from_sql; mod interval; mod jsonb; mod macros; +mod map_type; mod native_type; mod num256; mod ops; @@ -78,6 +80,7 @@ pub use self::datetime::{Date, Time, Timestamp}; pub use self::decimal::{Decimal, PowError as DecimalPowError}; pub use self::interval::{test_utils, DateTimeField, Interval, IntervalDisplay}; pub use self::jsonb::{JsonbRef, JsonbVal}; +pub use self::map_type::MapType; pub use self::native_type::*; pub use self::num256::{Int256, Int256Ref}; pub use self::ops::{CheckedAdd, IsNegative}; @@ -99,8 +102,13 @@ pub type F32 = ordered_float::OrderedFloat; pub type F64 = ordered_float::OrderedFloat; /// The set of datatypes that are supported in RisingWave. -// `EnumDiscriminants` will generate a `DataTypeName` enum with the same variants, -// but without data fields. +/// +/// # Trait implementations +/// +/// - `EnumDiscriminants` generates [`DataTypeName`] enum with the same variants, +/// but without data fields. +/// - `FromStr` is only used internally for tests. +/// The generated implementation isn't efficient, and doesn't handle whitespaces, etc. #[derive( Debug, Display, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, EnumDiscriminants, FromStr, )] @@ -166,8 +174,12 @@ pub enum DataType { #[display("rw_int256")] #[from_str(regex = "(?i)^rw_int256$")] Int256, + #[display("{0}")] + #[from_str(regex = "(?i)^(?P<0>.+)$")] + Map(MapType), } +// For DataType::List impl std::str::FromStr for Box { type Err = BoxedError; @@ -200,8 +212,8 @@ impl TryFrom for DataType { DataTypeName::Time => Ok(DataType::Time), DataTypeName::Interval => Ok(DataType::Interval), DataTypeName::Jsonb => Ok(DataType::Jsonb), - DataTypeName::Struct | DataTypeName::List => { - Err("Functions returning struct or list can not be inferred. Please use `FunctionCall::new_unchecked`.") + DataTypeName::Struct | DataTypeName::List | DataTypeName::Map => { + Err("Functions returning composite types can not be inferred. Please use `FunctionCall::new_unchecked`.") } } } @@ -236,6 +248,12 @@ impl From<&PbDataType> for DataType { // The first (and only) item is the list element type. Box::new((&proto.field_type[0]).into()), ), + PbTypeName::Map => { + // Map is physically the same as a list. + // So the first (and only) item is the list element type. + let list_entries_type: DataType = (&proto.field_type[0]).into(); + DataType::Map(MapType::from_list_entries(list_entries_type)) + } PbTypeName::Int256 => DataType::Int256, } } @@ -263,6 +281,7 @@ impl From for PbTypeName { DataTypeName::Struct => PbTypeName::Struct, DataTypeName::List => PbTypeName::List, DataTypeName::Int256 => PbTypeName::Int256, + DataTypeName::Map => PbTypeName::Map, } } } @@ -324,6 +343,11 @@ impl DataType { DataType::List(datatype) => { pb.field_type = vec![datatype.to_protobuf()]; } + DataType::Map(datatype) => { + // Same as List> + pb.field_type = + vec![DataType::Struct(datatype.clone().into_struct()).to_protobuf()]; + } DataType::Boolean | DataType::Int16 | DataType::Int32 @@ -366,6 +390,10 @@ impl DataType { matches!(self, DataType::Struct(_)) } + pub fn is_map(&self) -> bool { + matches!(self, DataType::Map(_)) + } + pub fn is_int(&self) -> bool { matches!(self, DataType::Int16 | DataType::Int32 | DataType::Int64) } @@ -383,14 +411,32 @@ impl DataType { Self::Struct(StructType::from_parts(field_names, fields)) } + pub fn new_unnamed_struct(fields: Vec) -> Self { + Self::Struct(StructType::unnamed(fields)) + } + pub fn as_struct(&self) -> &StructType { match self { DataType::Struct(t) => t, - _ => panic!("expect struct type"), + t => panic!("expect struct type, got {t}"), } } - /// Returns the inner type of a list type. + pub fn as_map(&self) -> &MapType { + match self { + DataType::Map(t) => t, + t => panic!("expect map type, got {t}"), + } + } + + pub fn into_map(self) -> MapType { + match self { + DataType::Map(t) => t, + t => panic!("expect map type, got {t}"), + } + } + + /// Returns the inner element's type of a list type. /// /// # Panics /// @@ -398,11 +444,13 @@ impl DataType { pub fn as_list(&self) -> &DataType { match self { DataType::List(t) => t, - _ => panic!("expect list type"), + t => panic!("expect list type, got {t}"), } } - /// Return a new type that removes the outer list. + /// Return a new type that removes the outer list, and get the innermost element type. + /// + /// Use [`DataType::as_list`] if you only want the element type of a list. /// /// ``` /// use risingwave_common::types::DataType::*; @@ -447,6 +495,10 @@ impl From for PbDataType { mod private { use super::*; + // Note: put pub trait inside a private mod just makes the name private, + // The trait methods will still be publicly available... + // a.k.a. ["Voldemort type"](https://rust-lang.github.io/rfcs/2145-type-privacy.html#lint-3-voldemort-types-its-reachable-but-i-cant-name-it) + /// Common trait bounds of scalar and scalar reference types. /// /// NOTE(rc): `Hash` is not in the trait bound list, it's implemented as [`ScalarRef::hash_scalar`]. @@ -610,7 +662,7 @@ macro_rules! impl_self_as_scalar_ref { )* }; } -impl_self_as_scalar_ref! { &str, &[u8], Int256Ref<'_>, JsonbRef<'_>, ListRef<'_>, StructRef<'_>, ScalarRefImpl<'_> } +impl_self_as_scalar_ref! { &str, &[u8], Int256Ref<'_>, JsonbRef<'_>, ListRef<'_>, StructRef<'_>, ScalarRefImpl<'_>, MapRef<'_> } /// `for_all_native_types` includes all native variants of our scalar types. /// @@ -831,7 +883,7 @@ impl ScalarImpl { .ok_or_else(|| "invalid value of Jsonb".to_string())?, ), DataType::Int256 => Self::Int256(Int256::from_binary(bytes)?), - DataType::Struct(_) | DataType::List(_) => { + DataType::Struct(_) | DataType::List(_) | DataType::Map(_) => { return Err(format!("unsupported data type: {}", data_type).into()); } }; @@ -864,6 +916,9 @@ impl ScalarImpl { DataType::Struct(_) => StructValue::from_str(s, data_type)?.into(), DataType::Jsonb => JsonbVal::from_str(s)?.into(), DataType::Bytea => str_to_bytea(s)?.into(), + DataType::Map(_) => { + todo!() + } }) } } @@ -930,7 +985,7 @@ impl ScalarRefImpl<'_> { self.to_text_with_type(data_type) } - /// Serialize the scalar. + /// Serialize the scalar into the `memcomparable` format. pub fn serialize( &self, ser: &mut memcomparable::Serializer, @@ -961,6 +1016,7 @@ impl ScalarRefImpl<'_> { Self::Jsonb(v) => v.memcmp_serialize(ser)?, Self::Struct(v) => v.memcmp_serialize(ser)?, Self::List(v) => v.memcmp_serialize(ser)?, + Self::Map(v) => v.memcmp_serialize(ser)?, }; Ok(()) } @@ -1015,6 +1071,7 @@ impl ScalarImpl { Ty::Jsonb => Self::Jsonb(JsonbVal::memcmp_deserialize(de)?), Ty::Struct(t) => StructValue::memcmp_deserialize(t.types(), de)?.to_scalar_value(), Ty::List(t) => ListValue::memcmp_deserialize(t, de)?.to_scalar_value(), + Ty::Map(t) => MapValue::memcmp_deserialize(t, de)?.to_scalar_value(), }) } @@ -1194,6 +1251,10 @@ mod tests { ScalarImpl::List(ListValue::from_iter([233i64, 2333])), DataType::List(Box::new(DataType::Int64)), ), + DataTypeName::Map => { + // map is not hashable + continue; + } }; test(Some(scalar), data_type.clone()); diff --git a/src/common/src/types/postgres_type.rs b/src/common/src/types/postgres_type.rs index ae147e9c9660e..d85f08ed59cc3 100644 --- a/src/common/src/types/postgres_type.rs +++ b/src/common/src/types/postgres_type.rs @@ -54,6 +54,12 @@ pub struct UnsupportedOid(i32); /// Get type information compatible with Postgres type, such as oid, type length. impl DataType { + /// For a fixed-size type, typlen is the number of bytes in the internal representation of the type. + /// But for a variable-length type, typlen is negative. + /// -1 indicates a “varlena” type (one that has a length word), + /// -2 indicates a null-terminated C string. + /// + /// pub fn type_len(&self) -> i16 { macro_rules! impl_type_len { ($( { $enum:ident | $oid:literal | $oid_array:literal | $name:ident | $input:ident | $len:literal } )*) => { @@ -63,7 +69,7 @@ impl DataType { )* DataType::Serial => 8, DataType::Int256 => -1, - DataType::List(_) | DataType::Struct(_) => -1, + DataType::List(_) | DataType::Struct(_) | DataType::Map(_) => -1, } } } @@ -96,6 +102,7 @@ impl DataType { for_all_base_types! { impl_from_oid } } + /// Refer to [`Self::from_oid`] pub fn to_oid(&self) -> i32 { macro_rules! impl_to_oid { ($( { $enum:ident | $oid:literal | $oid_array:literal | $name:ident | $input:ident | $len:literal } )*) => { @@ -111,10 +118,14 @@ impl DataType { DataType::Serial => 1016, DataType::Struct(_) => -1, DataType::List { .. } => unreachable!("Never reach here!"), + DataType::Map(_) => 1304, } DataType::Serial => 20, + // XXX: what does the oid mean here? Why we don't have from_oid for them? DataType::Int256 => 1301, + DataType::Map(_) => 1303, // TODO: Support to give a new oid for custom struct type. #9434 + // 1043 is varchar DataType::Struct(_) => 1043, } } @@ -133,6 +144,7 @@ impl DataType { DataType::List(_) => "list", DataType::Serial => "serial", DataType::Int256 => "rw_int256", + DataType::Map(_) => "map", } } } diff --git a/src/common/src/types/struct_type.rs b/src/common/src/types/struct_type.rs index a18f452af7a74..edc4b73311533 100644 --- a/src/common/src/types/struct_type.rs +++ b/src/common/src/types/struct_type.rs @@ -37,11 +37,11 @@ impl Debug for StructType { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct StructTypeInner { - // Details about a struct type. There are 2 cases for a struct: - // 1. `field_names.len() == field_types.len()`: it represents a struct with named fields, - // e.g. `STRUCT`. - // 2. `field_names.len() == 0`: it represents a struct with unnamed fields, - // e.g. `ROW(1, 2)`. + /// Details about a struct type. There are 2 cases for a struct: + /// 1. `field_names.len() == field_types.len()`: it represents a struct with named fields, + /// e.g. `STRUCT`. + /// 2. `field_names.len() == 0`: it represents a struct with unnamed fields, + /// e.g. `ROW(1, 2)`. field_names: Box<[String]>, field_types: Box<[DataType]>, } @@ -71,6 +71,8 @@ impl StructType { } pub(super) fn from_parts(field_names: Vec, field_types: Vec) -> Self { + // TODO: enable this assertion + // debug_assert!(field_names.len() == field_types.len()); Self(Arc::new(StructTypeInner { field_types: field_types.into(), field_names: field_names.into(), diff --git a/src/common/src/types/to_binary.rs b/src/common/src/types/to_binary.rs index 56eea301f3f61..da7f75f0a2a3f 100644 --- a/src/common/src/types/to_binary.rs +++ b/src/common/src/types/to_binary.rs @@ -102,6 +102,7 @@ impl ToBinary for ScalarRefImpl<'_> { issue = 7949, "the pgwire extended-mode encoding for {ty} is unsupported" ), + ScalarRefImpl::Map(_) => todo!(), } } } diff --git a/src/common/src/types/to_sql.rs b/src/common/src/types/to_sql.rs index 3ece8a574c450..57aab11daf4d7 100644 --- a/src/common/src/types/to_sql.rs +++ b/src/common/src/types/to_sql.rs @@ -46,6 +46,7 @@ impl ToSql for ScalarImpl { ScalarImpl::Int256(_) | ScalarImpl::Struct(_) | ScalarImpl::List(_) => { bail_not_implemented!("the postgres encoding for {ty} is unsupported") } + ScalarImpl::Map(_) => todo!(), } } diff --git a/src/common/src/util/memcmp_encoding.rs b/src/common/src/util/memcmp_encoding.rs index 5a5ad598093af..c9de13531b7fe 100644 --- a/src/common/src/util/memcmp_encoding.rs +++ b/src/common/src/util/memcmp_encoding.rs @@ -151,9 +151,9 @@ fn calculate_encoded_size_inner( deserializer.deserialize_decimal()?; 0 // the len is not used since decimal is not a fixed length type } - // these two types is var-length and should only be determine at runtime. + // these types are var-length and should only be determine at runtime. // TODO: need some test for this case (e.g. e2e test) - DataType::List { .. } => deserializer.skip_bytes()?, + DataType::List { .. } | DataType::Map(_) => deserializer.skip_bytes()?, DataType::Struct(t) => t .types() .map(|field| { diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index 322c542557ed7..d72dcdc7303f3 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -13,7 +13,8 @@ // limitations under the License. //! Value encoding is an encoding format which converts the data into a binary form (not -//! memcomparable). +//! memcomparable, i.e., Key encoding). + use bytes::{Buf, BufMut}; use chrono::{Datelike, Timelike}; use either::{for_both, Either}; @@ -226,6 +227,7 @@ fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) { ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf), ScalarRefImpl::Struct(s) => serialize_struct(s, buf), ScalarRefImpl::List(v) => serialize_list(v, buf), + ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf), } } @@ -251,6 +253,7 @@ fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize { ScalarRefImpl::Jsonb(v) => v.capacity(), ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s), ScalarRefImpl::List(v) => estimate_serialize_list_size(v), + ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()), } } @@ -354,6 +357,12 @@ fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result { DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?, DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()), DataType::List(item_type) => deserialize_list(item_type, data)?, + DataType::Map(map_type) => { + // FIXME: clone type everytime here is inefficient + let list = deserialize_list(&DataType::Struct(map_type.clone().into_struct()), data)? + .into_list(); + ScalarImpl::Map(MapValue::from_list_entries(list)) + } }) } diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index a28dddc9aa65a..fe9b77c643de7 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -127,8 +127,10 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne | DataType::Struct(_) | DataType::List(_) | DataType::Int256 - | DataType::Serial => { + | DataType::Serial + | DataType::Map(_) => { // Interval, Struct, List, Int256 are not supported + // XXX: is this branch reachable? if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::warn!(column = rw_field.name, ?rw_field.data_type, suppressed_count, "unsupported data type, set to null"); } diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index da17ea256ba3c..f55fe28f878f9 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -116,7 +116,8 @@ fn postgres_cell_to_scalar_impl( } } }, - DataType::Struct(_) | DataType::Serial => { + DataType::Struct(_) | DataType::Serial | DataType::Map(_) => { + // Is this branch reachable? // Struct and Serial are not supported tracing::warn!(name, ?data_type, "unsupported data type, set to null"); None diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index bbe0139caaa46..ebd18fed2063b 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -261,6 +261,7 @@ impl BigQuerySink { DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!( "Bigquery cannot support Int256" ))), + DataType::Map(_) => todo!(), } } @@ -310,6 +311,7 @@ impl BigQuerySink { "Bigquery cannot support Int256" ))) } + DataType::Map(_) => todo!(), }; Ok(tfs) } @@ -816,6 +818,7 @@ fn build_protobuf_field( "Don't support Float32 and Int256" ))) } + DataType::Map(_) => todo!(), } Ok((field, None)) } diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index d715e93b8d6c4..4337f2b9d76b7 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -473,6 +473,9 @@ impl ClickHouseSink { risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse( "clickhouse can not support Int256".to_string(), )), + risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse( + "clickhouse can not support Map".to_string(), + )), }; if !is_match? { return Err(SinkError::ClickHouse(format!( @@ -1020,6 +1023,11 @@ impl ClickHouseFieldWithNull { "clickhouse can not support Bytea".to_string(), )) } + ScalarRefImpl::Map(_) => { + return Err(SinkError::ClickHouse( + "clickhouse can not support Map".to_string(), + )) + } }; let data = if clickhouse_schema_feature.can_null { vec![ClickHouseFieldWithNull::WithSome(data)] diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 7745f2e9e98b1..0571c9a2bd6bc 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -188,6 +188,9 @@ impl DorisSink { risingwave_common::types::DataType::Int256 => { Err(SinkError::Doris("doris can not support Int256".to_string())) } + risingwave_common::types::DataType::Map(_) => { + Err(SinkError::Doris("doris can not support Map".to_string())) + } } } } diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 35b48c6e31faf..2df15f517ca0b 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -395,6 +395,7 @@ fn map_data_type( } AttributeValue::M(map) } + DataType::Map(_) => todo!(), }; Ok(attr) } diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 8122126727298..4a2060f0a8c6c 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -454,6 +454,10 @@ fn encode_field( DataType::Int256 => { return no_match_err(); } + DataType::Map(_) => { + // TODO: + return no_match_err(); + } }; D::handle_union(value, opt_idx) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 3652f38bacbb2..6dc8809f42933 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -401,6 +401,7 @@ pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str { DataType::Jsonb => "string", DataType::Serial => "string", DataType::Int256 => "string", + DataType::Map(_) => "map", } } diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index a0e4d41dc58de..8046606b5690c 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -420,6 +420,10 @@ fn encode_field( DataType::Int256 => { return no_match_err(); } + DataType::Map(_) => { + // TODO: + return no_match_err(); + } }; Ok(value) diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index a9bf0404f473e..9fff8a9b8ba6b 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -314,6 +314,7 @@ pub(crate) fn field_to_json(field: &Field) -> Value { // we do the same here risingwave_common::types::DataType::Struct(_) => ("string", ""), risingwave_common::types::DataType::List { .. } => ("string", ""), + risingwave_common::types::DataType::Map(_) => ("string", ""), }; if name.is_empty() { diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 23ea54944f11c..eb62b7fbc8cd8 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -211,7 +211,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe ))) } }, - DataType::Serial | DataType::Int256 => Err(SinkError::Remote(anyhow!( + DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!( "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}", col.name, col.data_type, diff --git a/src/connector/src/sink/sqlserver.rs b/src/connector/src/sink/sqlserver.rs index 0b8c7fa1f779e..aa16c4d3c3714 100644 --- a/src/connector/src/sink/sqlserver.rs +++ b/src/connector/src/sink/sqlserver.rs @@ -559,6 +559,7 @@ fn bind_params( ScalarRefImpl::List(_) => return Err(data_type_not_supported("List")), ScalarRefImpl::Int256(_) => return Err(data_type_not_supported("Int256")), ScalarRefImpl::Serial(_) => return Err(data_type_not_supported("Serial")), + ScalarRefImpl::Map(_) => return Err(data_type_not_supported("Map")), }, None => match schema[col_idx].data_type { DataType::Boolean => { @@ -606,6 +607,7 @@ fn bind_params( DataType::Jsonb => return Err(data_type_not_supported("Jsonb")), DataType::Serial => return Err(data_type_not_supported("Serial")), DataType::Int256 => return Err(data_type_not_supported("Int256")), + DataType::Map(_) => return Err(data_type_not_supported("Map")), }, }; } @@ -639,6 +641,7 @@ fn check_data_type_compatibility(data_type: &DataType) -> Result<()> { DataType::Jsonb => Err(data_type_not_supported("Jsonb")), DataType::Serial => Err(data_type_not_supported("Serial")), DataType::Int256 => Err(data_type_not_supported("Int256")), + DataType::Map(_) => Err(data_type_not_supported("Map")), } } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index e5881ee9f747e..84d3f95131758 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -246,6 +246,9 @@ impl StarrocksSink { risingwave_common::types::DataType::Int256 => Err(SinkError::Starrocks( "INT256 is not supported for Starrocks sink.".to_string(), )), + risingwave_common::types::DataType::Map(_) => Err(SinkError::Starrocks( + "MAP is not supported for Starrocks sink.".to_string(), + )), } } } diff --git a/src/expr/core/src/error.rs b/src/expr/core/src/error.rs index e02c5f4521cf5..4bceb284fbfd9 100644 --- a/src/expr/core/src/error.rs +++ b/src/expr/core/src/error.rs @@ -88,6 +88,7 @@ pub enum ExprError { #[error("More than one row returned by {0} used as an expression")] MaxOneRow(&'static str), + /// TODO: deprecate in favor of `Function` #[error(transparent)] Internal( #[from] @@ -111,6 +112,7 @@ pub enum ExprError { InvalidState(String), /// Function error message returned by UDF. + /// TODO: replace with `Function` #[error("{0}")] Custom(String), diff --git a/src/expr/core/src/sig/mod.rs b/src/expr/core/src/sig/mod.rs index c3f57acd69f56..ae5af5b57c649 100644 --- a/src/expr/core/src/sig/mod.rs +++ b/src/expr/core/src/sig/mod.rs @@ -395,6 +395,8 @@ pub enum SigDataType { AnyArray, /// Accepts any struct data type AnyStruct, + /// TODO: not all type can be used as a map key. + AnyMap, } impl From for SigDataType { @@ -410,6 +412,7 @@ impl std::fmt::Display for SigDataType { Self::Any => write!(f, "any"), Self::AnyArray => write!(f, "anyarray"), Self::AnyStruct => write!(f, "anystruct"), + Self::AnyMap => write!(f, "anymap"), } } } @@ -422,6 +425,7 @@ impl SigDataType { Self::Any => true, Self::AnyArray => dt.is_array(), Self::AnyStruct => dt.is_struct(), + Self::AnyMap => dt.is_map(), } } diff --git a/src/expr/impl/src/scalar/array.rs b/src/expr/impl/src/scalar/array.rs index aaefd17bba07d..48ee281b63c2d 100644 --- a/src/expr/impl/src/scalar/array.rs +++ b/src/expr/impl/src/scalar/array.rs @@ -14,9 +14,9 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::row::Row; -use risingwave_common::types::ToOwnedDatum; +use risingwave_common::types::{DataType, ListRef, MapType, MapValue, ToOwnedDatum}; use risingwave_expr::expr::Context; -use risingwave_expr::function; +use risingwave_expr::{function, ExprError}; #[function("array(...) -> anyarray", type_infer = "panic")] fn array(row: impl Row, ctx: &Context) -> ListValue { @@ -28,6 +28,32 @@ fn row_(row: impl Row) -> StructValue { StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect()) } +fn map_type_infer(args: &[DataType]) -> Result { + let map = MapType::try_from_kv(args[0].as_list().clone(), args[1].as_list().clone())?; + Ok(map.into()) +} + +/// # Example +/// +/// ```slt +/// query T +/// select map_from_entries(null::int[], array[1,2,3]); +/// ---- +/// NULL +/// +/// query T +/// select map_from_entries(array['a','b','c'], array[1,2,3]); +/// ---- +/// {"a":1,"b":2,"c":3} +/// ``` +#[function( + "map_from_entries(anyarray, anyarray) -> anymap", + type_infer = "map_type_infer" +)] +fn map(key: ListRef<'_>, value: ListRef<'_>) -> Result { + MapValue::try_from_kv(key.to_owned(), value.to_owned()).map_err(ExprError::Custom) +} + #[cfg(test)] mod tests { use risingwave_common::array::DataChunk; diff --git a/src/expr/impl/src/scalar/to_jsonb.rs b/src/expr/impl/src/scalar/to_jsonb.rs index bb381954cc76b..c11d4474dc43b 100644 --- a/src/expr/impl/src/scalar/to_jsonb.rs +++ b/src/expr/impl/src/scalar/to_jsonb.rs @@ -16,8 +16,8 @@ use std::fmt::Debug; use jsonbb::Builder; use risingwave_common::types::{ - DataType, Date, Decimal, Int256Ref, Interval, JsonbRef, JsonbVal, ListRef, ScalarRefImpl, - Serial, StructRef, Time, Timestamp, Timestamptz, ToText, F32, F64, + DataType, Date, Decimal, Int256Ref, Interval, JsonbRef, JsonbVal, ListRef, MapRef, + ScalarRefImpl, Serial, StructRef, Time, Timestamp, Timestamptz, ToText, F32, F64, }; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::Context; @@ -72,6 +72,7 @@ impl ToJsonb for ScalarRefImpl<'_> { Timestamptz(v) => v.add_to(ty, builder), Struct(v) => v.add_to(ty, builder), List(v) => v.add_to(ty, builder), + Map(v) => v.add_to(ty, builder), } } } @@ -227,6 +228,20 @@ impl ToJsonb for ListRef<'_> { } } +impl ToJsonb for MapRef<'_> { + fn add_to(self, data_type: &DataType, builder: &mut Builder) -> Result<()> { + let value_type = data_type.as_map().value(); + builder.begin_object(); + for (k, v) in self.iter() { + // XXX: is to_text here reasonable? + builder.add_string(&k.to_text()); + v.add_to(value_type, builder)?; + } + builder.end_object(); + Ok(()) + } +} + impl ToJsonb for StructRef<'_> { fn add_to(self, data_type: &DataType, builder: &mut Builder) -> Result<()> { builder.begin_object(); diff --git a/src/expr/impl/src/udf/wasm.rs b/src/expr/impl/src/udf/wasm.rs index bd84cfa004326..5a0dd0420b4d9 100644 --- a/src/expr/impl/src/udf/wasm.rs +++ b/src/expr/impl/src/udf/wasm.rs @@ -279,5 +279,6 @@ fn datatype_name(ty: &DataType) -> String { .map(|(name, ty)| format!("{}:{}", name, datatype_name(ty))) .join(",") ), + DataType::Map(_m) => todo!("map in wasm udf"), } } diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index ba51f4ba6bf54..9b25f4b2557b7 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -83,9 +83,10 @@ impl FunctionAttr { attrs } - /// Generate the type infer function. + /// Generate the type infer function: `fn(&[DataType]) -> Result` fn generate_type_infer_fn(&self) -> Result { if let Some(func) = &self.type_infer { + // XXX: should this be called "placeholder" or "unreachable"? if func == "panic" { return Ok(quote! { |_| panic!("type inference function is not implemented") }); } @@ -115,6 +116,11 @@ impl FunctionAttr { // infer as the type of "struct" argument return Ok(quote! { |args| Ok(args[#i].clone()) }); } + } else if self.ret == "anymap" { + if let Some(i) = self.args.iter().position(|t| t == "anymap") { + // infer as the type of "anymap" argument + return Ok(quote! { |args| Ok(args[#i].clone()) }); + } } else { // the return type is fixed let ty = data_type(&self.ret); @@ -122,13 +128,17 @@ impl FunctionAttr { } Err(Error::new( Span::call_site(), - "type inference function is required", + "type inference function cannot be automatically derived. You should provide: `type_infer = \"|args| Ok(...)\"`", )) } - /// Generate a descriptor of the scalar or table function. + /// Generate a descriptor (`FuncSign`) of the scalar or table function. /// /// The types of arguments and return value should not contain wildcard. + /// + /// # Arguments + /// `build_fn`: whether the user provided a function is a build function. + /// (from the `#[build_function]` macro) pub fn generate_function_descriptor( &self, user_fn: &UserFunctionAttr, @@ -156,6 +166,7 @@ impl FunctionAttr { } else if self.rewritten { quote! { |_, _| Err(ExprError::UnsupportedFunction(#name.into())) } } else { + // This is the core logic for `#[function]` self.generate_build_scalar_function(user_fn, true)? }; let type_infer_fn = self.generate_type_infer_fn()?; @@ -1302,6 +1313,7 @@ fn sig_data_type(ty: &str) -> TokenStream2 { match ty { "any" => quote! { SigDataType::Any }, "anyarray" => quote! { SigDataType::AnyArray }, + "anymap" => quote! { SigDataType::AnyMap }, "struct" => quote! { SigDataType::AnyStruct }, _ if ty.starts_with("struct") && ty.contains("any") => quote! { SigDataType::AnyStruct }, _ => { @@ -1320,6 +1332,12 @@ fn data_type(ty: &str) -> TokenStream2 { return quote! { DataType::Struct(#ty.parse().expect("invalid struct type")) }; } let variant = format_ident!("{}", types::data_type(ty)); + // TODO: enable the check + // assert!( + // !matches!(ty, "any" | "anyarray" | "anymap" | "struct"), + // "{ty}, {variant}" + // ); + quote! { DataType::#variant } } diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 3a905165c2ee2..8fd03e344db89 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -30,7 +30,7 @@ mod utils; /// Defining the RisingWave SQL function from a Rust function. /// -/// [Online version of this doc.](https://risingwavelabs.github.io/risingwave/risingwave_expr_macro/attr.function.html) +/// [Online version of this doc.](https://risingwavelabs.github.io/risingwave/rustdoc/risingwave_expr_macro/attr.function.html) /// /// # Table of Contents /// @@ -70,8 +70,8 @@ mod utils; /// name ( [arg_types],* [...] ) [ -> [setof] return_type ] /// ``` /// -/// Where `name` is the function name in `snake_case`, which must match the function name defined -/// in `prost`. +/// Where `name` is the function name in `snake_case`, which must match the function name (in `UPPER_CASE`) defined +/// in `proto/expr.proto`. /// /// `arg_types` is a comma-separated list of argument types. The allowed data types are listed in /// in the `name` column of the appendix's [type matrix]. Wildcards or `auto` can also be used, as @@ -98,7 +98,7 @@ mod utils; /// } /// ``` /// -/// ## Type Expansion +/// ## Type Expansion with `*` /// /// Types can be automatically expanded to multiple types using wildcards. Here are some examples: /// @@ -115,13 +115,17 @@ mod utils; /// #[function("cast(varchar) -> int64")] /// ``` /// -/// Please note the difference between `*` and `any`. `*` will generate a function for each type, +/// Please note the difference between `*` and `any`: `*` will generate a function for each type, /// whereas `any` will only generate one function with a dynamic data type `Scalar`. +/// This is similar to `impl T` and `dyn T` in Rust. The performance of using `*` would be much better than `any`. +/// But we do not always prefer `*` due to better performance. In some cases, using `any` is more convenient. +/// For example, in array functions, the element type of `ListValue` is `Scalar(Ref)Impl`. +/// It is unnecessary to convert it from/into various `T`. /// -/// ## Automatic Type Inference +/// ## Automatic Type Inference with `auto` /// /// Correspondingly, the return type can be denoted as `auto` to be automatically inferred based on -/// the input types. It will be inferred as the smallest type that can accommodate all input types. +/// the input types. It will be inferred as the _smallest type_ that can accommodate all input types. /// /// For example, `#[function("add(*int, *int) -> auto")]` will be expanded to: /// @@ -142,10 +146,10 @@ mod utils; /// #[function("neg(int64) -> int64")] /// ``` /// -/// ## Custom Type Inference Function +/// ## Custom Type Inference Function with `type_infer` /// /// A few functions might have a return type that dynamically changes based on the input argument -/// types, such as `unnest`. +/// types, such as `unnest`. This is mainly for composite types like `anyarray`, `struct`, and `anymap`. /// /// In such cases, the `type_infer` option can be used to specify a function to infer the return /// type based on the input argument types. Its function signature is @@ -163,7 +167,7 @@ mod utils; /// )] /// ``` /// -/// This type inference function will be invoked at the frontend. +/// This type inference function will be invoked at the frontend (`infer_type_with_sigmap`). /// /// # Rust Function Signature /// @@ -182,8 +186,9 @@ mod utils; /// /// ## Nullable Arguments /// -/// The functions above will only be called when all arguments are not null. If null arguments need -/// to be considered, the `Option` type can be used: +/// The functions above will only be called when all arguments are not null. +/// It will return null if any argument is null. +/// If null arguments need to be considered, the `Option` type can be used: /// /// ```ignore /// #[function("trim_array(anyarray, int32) -> anyarray")] @@ -192,11 +197,11 @@ mod utils; /// /// This function will be called when `n` is null, but not when `array` is null. /// -/// ## Return Value +/// ## Return `NULL`s and Errors /// /// Similarly, the return value type can be one of the following: /// -/// - `T`: Indicates that a non-null value is always returned, and errors will not occur. +/// - `T`: Indicates that a non-null value is always returned (for non-null inputs), and errors will not occur. /// - `Option`: Indicates that a null value may be returned, but errors will not occur. /// - `Result`: Indicates that an error may occur, but a null value will not be returned. /// - `Result>`: Indicates that a null value may be returned, and an error may also occur. @@ -419,6 +424,16 @@ pub fn function(attr: TokenStream, item: TokenStream) -> TokenStream { } } +/// Different from `#[function]`, which implements the `Expression` trait for a rust scalar function, +/// `#[build_function]` is used when you already implemented `Expression` manually. +/// +/// The expected input is a "build" function: +/// ```ignore +/// fn(data_type: DataType, children: Vec) -> Result +/// ``` +/// +/// It generates the function descriptor using the "build" function and +/// registers the description to the `FUNC_SIG_MAP`. #[proc_macro_attribute] pub fn build_function(attr: TokenStream, item: TokenStream) -> TokenStream { fn inner(attr: TokenStream, item: TokenStream) -> Result { diff --git a/src/expr/macro/src/types.rs b/src/expr/macro/src/types.rs index f2219a1c34bd6..4f07162d038a0 100644 --- a/src/expr/macro/src/types.rs +++ b/src/expr/macro/src/types.rs @@ -35,6 +35,7 @@ const TYPE_MATRIX: &str = " jsonb Jsonb JsonbArray JsonbVal JsonbRef<'_> _ anyarray List ListArray ListValue ListRef<'_> _ struct Struct StructArray StructValue StructRef<'_> _ + anymap Map MapArray MapValue MapRef<'_> _ any ??? ArrayImpl ScalarImpl ScalarRefImpl<'_> _ "; @@ -81,7 +82,7 @@ fn lookup_matrix(mut ty: &str, idx: usize) -> &str { None } }); - s.unwrap_or_else(|| panic!("unknown type: {}", ty)) + s.unwrap_or_else(|| panic!("failed to lookup type matrix: unknown type: {}", ty)) } /// Expands a type wildcard string into a list of concrete types. diff --git a/src/frontend/src/binder/expr/function/builtin_scalar.rs b/src/frontend/src/binder/expr/function/builtin_scalar.rs index b38a36586d1e7..3987334b89ced 100644 --- a/src/frontend/src/binder/expr/function/builtin_scalar.rs +++ b/src/frontend/src/binder/expr/function/builtin_scalar.rs @@ -129,6 +129,9 @@ impl Binder { ) } + // XXX: can we unify this with FUNC_SIG_MAP? + // For raw_call here, it seems unnecessary to declare it again here. + // For some functions, we have validation logic here. Is it still useful now? static HANDLES: LazyLock> = LazyLock::new(|| { [ ( @@ -387,6 +390,8 @@ impl Binder { ("jsonb_path_query_array", raw_call(ExprType::JsonbPathQueryArray)), ("jsonb_path_query_first", raw_call(ExprType::JsonbPathQueryFirst)), ("jsonb_set", raw_call(ExprType::JsonbSet)), + // map + ("map_from_entries", raw_call(ExprType::MapFromEntries)), // Functions that return a constant value ("pi", pi()), // greatest and least @@ -692,6 +697,7 @@ impl Binder { return Ok(FunctionCall::new(func, inputs)?.into()); } + // Note: for raw_call, we only check name here. The type check is done later. match HANDLES.get(function_name) { Some(handle) => handle(self, inputs), None => { diff --git a/src/frontend/src/binder/expr/value.rs b/src/frontend/src/binder/expr/value.rs index e1fc78e884e02..5b69610f13bfe 100644 --- a/src/frontend/src/binder/expr/value.rs +++ b/src/frontend/src/binder/expr/value.rs @@ -212,7 +212,7 @@ impl Binder { .map(|e| self.bind_expr_inner(e)) .collect::>>()?; let data_type = - DataType::new_struct(exprs.iter().map(|e| e.return_type()).collect_vec(), vec![]); + DataType::new_unnamed_struct(exprs.iter().map(|e| e.return_type()).collect_vec()); let expr: ExprImpl = FunctionCall::new_unchecked(ExprType::Row, exprs, data_type).into(); Ok(expr) } diff --git a/src/frontend/src/expr/literal.rs b/src/frontend/src/expr/literal.rs index d44a1b859d289..29ac0948b6c0b 100644 --- a/src/frontend/src/expr/literal.rs +++ b/src/frontend/src/expr/literal.rs @@ -54,7 +54,8 @@ impl std::fmt::Debug for Literal { | DataType::Interval | DataType::Jsonb | DataType::Int256 - | DataType::Struct(_) => write!( + | DataType::Struct(_) + | DataType::Map(_) => write!( f, "'{}'", v.as_scalar_ref_impl().to_text_with_type(&data_type) diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index dd2f353a34b0d..90aa65d8e549d 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -249,7 +249,8 @@ impl ExprVisitor for ImpureAnalyzer { | Type::InetNtoa | Type::InetAton | Type::QuoteLiteral - | Type::QuoteNullable => + | Type::QuoteNullable + | Type::MapFromEntries => // expression output is deterministic(same result for the same input) { func_call diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 5b6a12ba58cce..1797770cc7611 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -730,6 +730,8 @@ pub fn infer_type_name<'a>( }; if candidates.is_empty() { + // TODO: when type mismatches, show what are supported signatures for the + // function with the given name. bail_no_function!("{}", sig()); } diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 84b8c4f6eb8f3..d744f1ba14a14 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -291,6 +291,7 @@ impl Strong { | ExprType::JsonbPopulateRecord | ExprType::JsonbToRecord | ExprType::JsonbSet + | ExprType::MapFromEntries | ExprType::Vnode | ExprType::TestPaidTier | ExprType::Proctime diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index e65b249379750..548fda7b92af4 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -746,7 +746,7 @@ impl<'a> TableScanIoEstimator<'a> { .sum::() } - pub fn estimate_data_type_size(data_type: &DataType) -> usize { + fn estimate_data_type_size(data_type: &DataType) -> usize { use std::mem::size_of; match data_type { @@ -769,6 +769,7 @@ impl<'a> TableScanIoEstimator<'a> { DataType::Jsonb => 20, DataType::Struct { .. } => 20, DataType::List { .. } => 20, + DataType::Map(_) => 20, } } diff --git a/src/tests/sqlsmith/src/sql_gen/types.rs b/src/tests/sqlsmith/src/sql_gen/types.rs index 2f12f96790f3d..1da66ca710e0c 100644 --- a/src/tests/sqlsmith/src/sql_gen/types.rs +++ b/src/tests/sqlsmith/src/sql_gen/types.rs @@ -53,6 +53,7 @@ pub(super) fn data_type_to_ast_data_type(data_type: &DataType) -> AstDataType { .collect(), ), DataType::List(ref typ) => AstDataType::Array(Box::new(data_type_to_ast_data_type(typ))), + DataType::Map(_) => todo!(), } }