From 357279257626f5f10159d4044595928ec890a562 Mon Sep 17 00:00:00 2001
From: xxchan <xxchan22f@gmail.com>
Date: Tue, 16 Jul 2024 12:02:19 +0800
Subject: [PATCH] map: make it compie

---
 .../dev/src/design/data-model-and-encoding.md |  16 +-
 proto/data.proto                              |   2 +
 src/common/src/array/arrow/arrow_impl.rs      |  37 +
 src/common/src/array/map_array.rs             | 936 ++++++++++++++++++
 src/common/src/array/mod.rs                   |  10 +
 src/common/src/array/proto_reader.rs          |   1 +
 src/common/src/hash/key.rs                    |   3 +-
 src/common/src/test_utils/rand_array.rs       |   8 +-
 src/common/src/test_utils/rand_chunk.rs       |   9 +-
 src/common/src/types/macros.rs                |   1 +
 src/common/src/types/mod.rs                   |  59 +-
 src/common/src/types/postgres_type.rs         |   5 +
 src/common/src/types/scalar_impl.rs           |  20 +
 src/common/src/types/to_binary.rs             |   1 +
 src/common/src/types/to_sql.rs                |   1 +
 src/common/src/util/memcmp_encoding.rs        |   1 +
 src/common/src/util/value_encoding/mod.rs     |   6 +-
 src/expr/macro/src/types.rs                   |   1 +
 18 files changed, 1104 insertions(+), 13 deletions(-)
 create mode 100644 src/common/src/array/map_array.rs

diff --git a/docs/dev/src/design/data-model-and-encoding.md b/docs/dev/src/design/data-model-and-encoding.md
index c2a70c40909c6..866f8c628e40a 100644
--- a/docs/dev/src/design/data-model-and-encoding.md
+++ b/docs/dev/src/design/data-model-and-encoding.md
@@ -25,12 +25,21 @@ Composite data types:
 
 - `Struct`: A structure with a list of named, strong-typed fields.
 - `List`: A variable-length list of values with same data type.
+- `Map`:
+
+`for_all_variants`
 
 ## In-Memory Encoding
 
+### Array
+
 > Source files: `common/src/array`
 
-In-memory data is encoded in arrays for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.
+- columnar
+- array and scalar
+- difference between arrow (see ToArrow trait)
+
+In-memory data is encoded in **arrays** for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.
 
 A Data Chunk consists of multiple columns and a visibility array, as is shown in the left subgraph below. The visibility array marks each row as visible or not. This helps filtering some rows while keeping other data arrays unchanged.
 
@@ -38,9 +47,12 @@ A Stream Chunk consists of columns, visibility array and an additional `ops` col
 
 ![chunk](../images/data-model-and-encoding/chunk.svg)
 
+### Hash Encoding
+
 ## On-Disk Encoding
 
-> Source files: `utils/memcomparable`, `utils/value-encoding`
+- key encoding: [memcomparable](https://docs.rs/memcomparable/latest/memcomparable/)
+- value encoding: `util/value_encoding`
 
 RisingWave stores user data in shared key-value storage called 'Hummock'. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that `NULL` values are omitted.
 
diff --git a/proto/data.proto b/proto/data.proto
index 06d223d142bf9..9bb15ebcc8d62 100644
--- a/proto/data.proto
+++ b/proto/data.proto
@@ -52,6 +52,7 @@ message DataType {
     JSONB = 18;
     SERIAL = 19;
     INT256 = 20;
+    MAP = 21;
   }
   TypeName type_name = 1;
   // Data length for char.
@@ -102,6 +103,7 @@ enum ArrayType {
   JSONB = 16;
   SERIAL = 17;
   INT256 = 18;
+  MAP = 20;
 }
 
 message Array {
diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs
index 2ecef10e7aa3f..77b5a39cc1561 100644
--- a/src/common/src/array/arrow/arrow_impl.rs
+++ b/src/common/src/array/arrow/arrow_impl.rs
@@ -42,6 +42,7 @@
 
 use std::fmt::Write;
 
+use arrow_array_iceberg::array;
 use arrow_buffer::OffsetBuffer;
 use chrono::{DateTime, NaiveDateTime, NaiveTime};
 use itertools::Itertools;
@@ -113,6 +114,7 @@ pub trait ToArrow {
             ArrayImpl::Serial(array) => self.serial_to_arrow(array),
             ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
             ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
+            ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
         }?;
         if arrow_array.data_type() != data_type {
             arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
@@ -267,6 +269,27 @@ pub trait ToArrow {
         )))
     }
 
+    #[inline]
+    fn map_to_arrow(
+        &self,
+        data_type: &arrow_schema::DataType,
+        array: &MapArray,
+    ) -> Result<arrow_array::ArrayRef, ArrayError> {
+        todo!()
+        // let arrow_schema::DataType::List(field) = data_type else {
+        //     return Err(ArrayError::to_arrow("Invalid list type"));
+        // };
+        // let values = self.to_array(field.data_type(), array.values())?;
+        // let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
+        // let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
+        // Ok(Arc::new(arrow_array::ListArray::new(
+        //     field.clone(),
+        //     offsets,
+        //     values,
+        //     nulls,
+        // )))
+    }
+
     /// Convert RisingWave data type to Arrow data type.
     ///
     /// This function returns a `Field` instead of `DataType` because some may be converted to
@@ -297,6 +320,7 @@ pub trait ToArrow {
             DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
             DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
             DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
+            DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
         };
         Ok(arrow_schema::Field::new(name, data_type, true))
     }
@@ -413,6 +437,19 @@ pub trait ToArrow {
                 .try_collect::<_, _, ArrayError>()?,
         ))
     }
+
+    #[inline]
+    fn map_type_to_arrow(&self, elem_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
+        let sorted = false;
+        let key = todo!();
+        let value = todo!();
+        let struct_ = arrow_schema::DataType::Struct([key, value].into());
+        Ok(arrow_schema::DataType::Map(
+            todo!(),
+            // Arc::new(self.to_arrow_field("item", struct_)?),
+            sorted,
+        ))
+    }
 }
 
 /// Defines how to convert Arrow arrays to RisingWave arrays.
diff --git a/src/common/src/array/map_array.rs b/src/common/src/array/map_array.rs
new file mode 100644
index 0000000000000..f785e4e7ff6a5
--- /dev/null
+++ b/src/common/src/array/map_array.rs
@@ -0,0 +1,936 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::borrow::Cow;
+use std::cmp::Ordering;
+use std::fmt::{self, Debug, Display};
+use std::future::Future;
+use std::mem::size_of;
+
+use bytes::{Buf, BufMut};
+use itertools::Itertools;
+use risingwave_common_estimate_size::EstimateSize;
+use risingwave_pb::data::{ListArrayData, PbArray, PbArrayType};
+use serde::{Deserialize, Serializer};
+use thiserror_ext::AsReport;
+
+use super::{
+    Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayResult, BoolArray, PrimitiveArray,
+    PrimitiveArrayItemType, RowRef, Utf8Array,
+};
+use crate::bitmap::{Bitmap, BitmapBuilder};
+use crate::row::Row;
+use crate::types::{
+    hash_datum, DataType, Datum, DatumRef, DefaultOrd, Scalar, ScalarImpl, ScalarRefImpl,
+    ToDatumRef, ToText,
+};
+use crate::util::memcmp_encoding;
+use crate::util::value_encoding::estimate_serialize_datum_size;
+
+#[derive(Debug, Clone, EstimateSize)]
+pub struct MapArrayBuilder {
+    bitmap: BitmapBuilder,
+    offsets: Vec<u32>,
+    value: Box<ArrayBuilderImpl>,
+    len: usize,
+}
+
+impl ArrayBuilder for MapArrayBuilder {
+    type ArrayType = MapArray;
+
+    #[cfg(not(test))]
+    fn new(_capacity: usize) -> Self {
+        panic!("please use `MapArrayBuilder::with_type` instead");
+    }
+
+    #[cfg(test)]
+    fn new(capacity: usize) -> Self {
+        todo!()
+
+        // Self::with_type(
+        //     capacity,
+        //     // Default datatype
+        //     DataType::Map(Box::new(DataType::Int16)),
+        // )
+    }
+
+    fn with_type(capacity: usize, ty: DataType) -> Self {
+        todo!()
+        // let DataType::Map(value_type) = ty else {
+        //     panic!("data type must be DataType::Map");
+        // };
+        // let mut offsets = Vec::with_capacity(capacity + 1);
+        // offsets.push(0);
+        // Self {
+        //     bitmap: BitmapBuilder::with_capacity(capacity),
+        //     offsets,
+        //     value: Box::new(value_type.create_array_builder(capacity)),
+        //     len: 0,
+        // }
+    }
+
+    fn append_n(&mut self, n: usize, value: Option<MapRef<'_>>) {
+        match value {
+            None => {
+                self.bitmap.append_n(n, false);
+                let last = *self.offsets.last().unwrap();
+                for _ in 0..n {
+                    self.offsets.push(last);
+                }
+            }
+            Some(v) => {
+                self.bitmap.append_n(n, true);
+                for _ in 0..n {
+                    let last = *self.offsets.last().unwrap();
+                    let elems = v.iter();
+                    self.offsets.push(
+                        last.checked_add(elems.len() as u32)
+                            .expect("offset overflow"),
+                    );
+                    for elem in elems {
+                        self.value.append(elem);
+                    }
+                }
+            }
+        }
+        self.len += n;
+    }
+
+    fn append_array(&mut self, other: &MapArray) {
+        self.bitmap.append_bitmap(&other.bitmap);
+        let last = *self.offsets.last().unwrap();
+        self.offsets
+            .append(&mut other.offsets[1..].iter().map(|o| *o + last).collect());
+        self.value.append_array(&other.value);
+        self.len += other.len();
+    }
+
+    fn pop(&mut self) -> Option<()> {
+        self.bitmap.pop()?;
+        let start = self.offsets.pop().unwrap();
+        let end = *self.offsets.last().unwrap();
+        self.len -= 1;
+        for _ in end..start {
+            self.value.pop().unwrap();
+        }
+        Some(())
+    }
+
+    fn len(&self) -> usize {
+        self.bitmap.len()
+    }
+
+    fn finish(self) -> MapArray {
+        MapArray {
+            bitmap: self.bitmap.finish(),
+            offsets: self.offsets.into(),
+            value: Box::new(self.value.finish()),
+        }
+    }
+}
+
+impl MapArrayBuilder {
+    pub fn append_row_ref(&mut self, row: RowRef<'_>) {
+        self.bitmap.append(true);
+        let last = *self.offsets.last().unwrap();
+        self.offsets
+            .push(last.checked_add(row.len() as u32).expect("offset overflow"));
+        self.len += 1;
+        for v in row.iter() {
+            self.value.append(v);
+        }
+    }
+}
+
+/// Each item of this `MapArray` is a `Map<T>`, or called `T[]` (T array).
+///
+/// * As other arrays, there is a null bitmap, with `1` meaning nonnull and `0` meaning null.
+/// * As [`super::BytesArray`], there is an offsets `Vec` and a value `Array`. The value `Array` has
+///   all items concatenated, and the offsets `Vec` stores start and end indices into it for
+///   slicing. Effectively, the inner array is the flattened form, and `offsets.len() == n + 1`.
+///
+/// For example, `values (array[1]), (array[]::int[]), (null), (array[2, 3]);` stores an inner
+///  `I32Array` with `[1, 2, 3]`, along with offsets `[0, 1, 1, 1, 3]` and null bitmap `TTFT`.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct MapArray {
+    pub(super) bitmap: Bitmap,
+    pub(super) offsets: Box<[u32]>,
+    pub(super) value: Box<ArrayImpl>,
+}
+
+impl EstimateSize for MapArray {
+    fn estimated_heap_size(&self) -> usize {
+        self.bitmap.estimated_heap_size()
+            + self.offsets.len() * size_of::<u32>()
+            + self.value.estimated_size()
+    }
+}
+
+impl Array for MapArray {
+    type Builder = MapArrayBuilder;
+    type OwnedItem = MapValue;
+    type RefItem<'a> = MapRef<'a>;
+
+    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
+        MapRef {
+            array: &self.value,
+            start: *self.offsets.get_unchecked(idx),
+            end: *self.offsets.get_unchecked(idx + 1),
+        }
+    }
+
+    fn len(&self) -> usize {
+        self.bitmap.len()
+    }
+
+    fn to_protobuf(&self) -> PbArray {
+        todo!()
+        // let value = self.value.to_protobuf();
+        // PbArray {
+        //     array_type: PbArrayType::Map as i32,
+        //     struct_array_data: None,
+        //     list_array_data: Some(Box::new(MapArrayData {
+        //         offsets: self.offsets.to_vec(),
+        //         value: Some(Box::new(value)),
+        //         value_type: Some(self.value.data_type().to_protobuf()),
+        //     })),
+        //     null_bitmap: Some(self.bitmap.to_protobuf()),
+        //     values: vec![],
+        // }
+    }
+
+    fn null_bitmap(&self) -> &Bitmap {
+        &self.bitmap
+    }
+
+    fn into_null_bitmap(self) -> Bitmap {
+        self.bitmap
+    }
+
+    fn set_bitmap(&mut self, bitmap: Bitmap) {
+        self.bitmap = bitmap;
+    }
+
+    fn data_type(&self) -> DataType {
+        todo!()
+        // DataType::Map(Box::new(self.value.data_type()))
+    }
+}
+
+impl MapArray {
+    /// Flatten the list array into a single array.
+    ///
+    /// # Example
+    ///
+    /// ```text
+    /// [[1,2,3],NULL,[4,5]] => [1,2,3,4,5]
+    /// [[[1],[2]],[[3],[4]]] => [1,2,3,4]
+    /// ```
+    pub fn flatten(&self) -> ArrayImpl {
+        match &*self.value {
+            ArrayImpl::Map(inner) => inner.flatten(),
+            a => a.clone(),
+        }
+    }
+
+    /// Return the inner array of the list array.
+    pub fn values(&self) -> &ArrayImpl {
+        &self.value
+    }
+
+    pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
+        ensure!(
+            array.values.is_empty(),
+            "Must have no buffer in a list array"
+        );
+        let bitmap: Bitmap = array.get_null_bitmap()?.into();
+        let array_data = array.get_list_array_data()?.to_owned();
+        let flatten_len = match array_data.offsets.last() {
+            Some(&n) => n as usize,
+            None => bail!("Must have at least one element in offsets"),
+        };
+        let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), flatten_len)?;
+        let arr = MapArray {
+            bitmap,
+            offsets: array_data.offsets.into(),
+            value: Box::new(value),
+        };
+        Ok(arr.into())
+    }
+
+    /// Apply the function on the underlying elements.
+    /// e.g. `map_inner([[1,2,3],NULL,[4,5]], DOUBLE) = [[2,4,6],NULL,[8,10]]`
+    pub async fn map_inner<E, Fut, F>(self, f: F) -> std::result::Result<MapArray, E>
+    where
+        F: FnOnce(ArrayImpl) -> Fut,
+        Fut: Future<Output = std::result::Result<ArrayImpl, E>>,
+    {
+        let new_value = (f)(*self.value).await?;
+
+        Ok(Self {
+            offsets: self.offsets,
+            bitmap: self.bitmap,
+            value: Box::new(new_value),
+        })
+    }
+
+    /// Returns the offsets of this list.
+    ///
+    /// # Example
+    /// ```text
+    /// list    = [[a, b, c], [], NULL, [d], [NULL, f]]
+    /// offsets = [0, 3, 3, 3, 4, 6]
+    /// ```
+    pub fn offsets(&self) -> &[u32] {
+        &self.offsets
+    }
+}
+
+impl<T, L> FromIterator<Option<L>> for MapArray
+where
+    T: PrimitiveArrayItemType,
+    L: IntoIterator<Item = T>,
+{
+    fn from_iter<I: IntoIterator<Item = Option<L>>>(iter: I) -> Self {
+        todo!()
+        // let iter = iter.into_iter();
+        // let mut builder = MapArrayBuilder::with_type(
+        //     iter.size_hint().0,
+        //     DataType::Map(Box::new(T::DATA_TYPE.clone())),
+        // );
+        // for v in iter {
+        //     match v {
+        //         None => builder.append(None),
+        //         Some(v) => {
+        //             builder.append(Some(v.into_iter().collect::<MapValue>().as_scalar_ref()))
+        //         }
+        //     }
+        // }
+        // builder.finish()
+    }
+}
+
+impl FromIterator<MapValue> for MapArray {
+    fn from_iter<I: IntoIterator<Item = MapValue>>(iter: I) -> Self {
+        todo!()
+        // let mut iter = iter.into_iter();
+        // let first = iter.next().expect("empty iterator");
+        // let mut builder = MapArrayBuilder::with_type(
+        //     iter.size_hint().0,
+        //     DataType::Map(Box::new(first.data_type())),
+        // );
+        // builder.append(Some(first.as_scalar_ref()));
+        // for v in iter {
+        //     builder.append(Some(v.as_scalar_ref()));
+        // }
+        // builder.finish()
+    }
+}
+
+#[derive(Clone, PartialEq, Eq, EstimateSize)]
+pub struct MapValue {
+    values: Box<ArrayImpl>,
+}
+
+impl Debug for MapValue {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.as_scalar_ref().fmt(f)
+    }
+}
+
+impl Display for MapValue {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.as_scalar_ref().write(f)
+    }
+}
+
+impl MapValue {
+    pub fn new(values: ArrayImpl) -> Self {
+        Self {
+            values: Box::new(values),
+        }
+    }
+
+    pub fn into_array(self) -> ArrayImpl {
+        *self.values
+    }
+
+    pub fn empty(datatype: &DataType) -> Self {
+        Self::new(datatype.create_array_builder(0).finish())
+    }
+
+    /// Creates a new `MapValue` from an iterator of `Datum`.
+    pub fn from_datum_iter<T: ToDatumRef>(
+        datatype: &DataType,
+        iter: impl IntoIterator<Item = T>,
+    ) -> Self {
+        let iter = iter.into_iter();
+        let mut builder = datatype.create_array_builder(iter.size_hint().0);
+        for datum in iter {
+            builder.append(datum);
+        }
+        Self::new(builder.finish())
+    }
+
+    /// Returns the length of the list.
+    pub fn len(&self) -> usize {
+        self.values.len()
+    }
+
+    /// Returns `true` if the list has a length of 0.
+    pub fn is_empty(&self) -> bool {
+        self.values.is_empty()
+    }
+
+    /// Iterates over the elements of the list.
+    pub fn iter(&self) -> impl DoubleEndedIterator + ExactSizeIterator<Item = DatumRef<'_>> {
+        self.values.iter()
+    }
+
+    /// Get the element at the given index. Returns `None` if the index is out of bounds.
+    pub fn get(&self, index: usize) -> Option<DatumRef<'_>> {
+        if index < self.len() {
+            Some(self.values.value_at(index))
+        } else {
+            None
+        }
+    }
+
+    /// Returns the data type of the elements in the list.
+    pub fn data_type(&self) -> DataType {
+        self.values.data_type()
+    }
+
+    pub fn memcmp_deserialize(
+        datatype: &DataType,
+        deserializer: &mut memcomparable::Deserializer<impl Buf>,
+    ) -> memcomparable::Result<Self> {
+        let bytes = serde_bytes::ByteBuf::deserialize(deserializer)?;
+        let mut inner_deserializer = memcomparable::Deserializer::new(bytes.as_slice());
+        let mut builder = datatype.create_array_builder(0);
+        while inner_deserializer.has_remaining() {
+            builder.append(memcmp_encoding::deserialize_datum_in_composite(
+                datatype,
+                &mut inner_deserializer,
+            )?)
+        }
+        Ok(Self::new(builder.finish()))
+    }
+
+    // Used to display MapValue in explain for better readibilty.
+    pub fn display_for_explain(&self) -> String {
+        // Example of MapValue display: ARRAY[1, 2, null]
+        format!(
+            "ARRAY[{}]",
+            self.iter()
+                .map(|v| {
+                    match v.as_ref() {
+                        None => "null".into(),
+                        Some(scalar) => scalar.to_text(),
+                    }
+                })
+                .format(", ")
+        )
+    }
+
+    /// Returns a mutable slice if the list is of type `int64[]`.
+    pub fn as_i64_mut_slice(&mut self) -> Option<&mut [i64]> {
+        match self.values.as_mut() {
+            ArrayImpl::Int64(array) => Some(array.as_mut_slice()),
+            _ => None,
+        }
+    }
+}
+
+impl PartialOrd for MapValue {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for MapValue {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.as_scalar_ref().cmp(&other.as_scalar_ref())
+    }
+}
+
+impl<T: PrimitiveArrayItemType> FromIterator<Option<T>> for MapValue {
+    fn from_iter<I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
+        Self::new(iter.into_iter().collect::<PrimitiveArray<T>>().into())
+    }
+}
+
+impl<T: PrimitiveArrayItemType> FromIterator<T> for MapValue {
+    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
+        Self::new(iter.into_iter().collect::<PrimitiveArray<T>>().into())
+    }
+}
+
+impl FromIterator<bool> for MapValue {
+    fn from_iter<I: IntoIterator<Item = bool>>(iter: I) -> Self {
+        Self::new(iter.into_iter().collect::<BoolArray>().into())
+    }
+}
+
+impl<'a> FromIterator<Option<&'a str>> for MapValue {
+    fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
+        Self::new(iter.into_iter().collect::<Utf8Array>().into())
+    }
+}
+
+impl<'a> FromIterator<&'a str> for MapValue {
+    fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
+        Self::new(iter.into_iter().collect::<Utf8Array>().into())
+    }
+}
+
+impl FromIterator<MapValue> for MapValue {
+    fn from_iter<I: IntoIterator<Item = MapValue>>(iter: I) -> Self {
+        Self::new(iter.into_iter().collect::<MapArray>().into())
+    }
+}
+
+impl From<MapValue> for ArrayImpl {
+    fn from(value: MapValue) -> Self {
+        *value.values
+    }
+}
+
+#[derive(Copy, Clone)]
+pub struct MapRef<'a> {
+    array: &'a ArrayImpl,
+    start: u32,
+    end: u32,
+}
+
+impl<'a> MapRef<'a> {
+    /// Returns the length of the list.
+    pub fn len(&self) -> usize {
+        (self.end - self.start) as usize
+    }
+
+    /// Returns `true` if the list has a length of 0.
+    pub fn is_empty(&self) -> bool {
+        self.start == self.end
+    }
+
+    /// Returns the data type of the elements in the list.
+    pub fn data_type(&self) -> DataType {
+        self.array.data_type()
+    }
+
+    /// Returns the elements in the flattened list.
+    pub fn flatten(self) -> MapRef<'a> {
+        match self.array {
+            ArrayImpl::Map(inner) => MapRef {
+                array: &inner.value,
+                start: inner.offsets[self.start as usize],
+                end: inner.offsets[self.end as usize],
+            }
+            .flatten(),
+            _ => self,
+        }
+    }
+
+    /// Iterates over the elements of the list.
+    pub fn iter(self) -> impl DoubleEndedIterator + ExactSizeIterator<Item = DatumRef<'a>> + 'a {
+        (self.start..self.end).map(|i| self.array.value_at(i as usize))
+    }
+
+    /// Get the element at the given index. Returns `None` if the index is out of bounds.
+    pub fn get(self, index: usize) -> Option<DatumRef<'a>> {
+        if index < self.len() {
+            Some(self.array.value_at(self.start as usize + index))
+        } else {
+            None
+        }
+    }
+
+    pub fn memcmp_serialize(
+        self,
+        serializer: &mut memcomparable::Serializer<impl BufMut>,
+    ) -> memcomparable::Result<()> {
+        let mut inner_serializer = memcomparable::Serializer::new(vec![]);
+        for datum_ref in self.iter() {
+            memcmp_encoding::serialize_datum_in_composite(datum_ref, &mut inner_serializer)?
+        }
+        serializer.serialize_bytes(&inner_serializer.into_inner())
+    }
+
+    pub fn hash_scalar_inner<H: std::hash::Hasher>(self, state: &mut H) {
+        for datum_ref in self.iter() {
+            hash_datum(datum_ref, state);
+        }
+    }
+
+    /// estimate the serialized size with value encoding
+    pub fn estimate_serialize_size_inner(self) -> usize {
+        self.iter().map(estimate_serialize_datum_size).sum()
+    }
+
+    pub fn to_owned(self) -> MapValue {
+        let mut builder = self.array.create_builder(self.len());
+        for datum_ref in self.iter() {
+            builder.append(datum_ref);
+        }
+        MapValue::new(builder.finish())
+    }
+
+    /// Returns a slice if the list is of type `int64[]`.
+    pub fn as_i64_slice(&self) -> Option<&[i64]> {
+        match &self.array {
+            ArrayImpl::Int64(array) => {
+                Some(&array.as_slice()[self.start as usize..self.end as usize])
+            }
+            _ => None,
+        }
+    }
+}
+
+impl PartialEq for MapRef<'_> {
+    fn eq(&self, other: &Self) -> bool {
+        self.iter().eq(other.iter())
+    }
+}
+
+impl Eq for MapRef<'_> {}
+
+impl PartialOrd for MapRef<'_> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for MapRef<'_> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.iter().cmp_by(other.iter(), |a, b| a.default_cmp(&b))
+    }
+}
+
+impl Debug for MapRef<'_> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_list().entries(self.iter()).finish()
+    }
+}
+
+impl Row for MapRef<'_> {
+    fn datum_at(&self, index: usize) -> DatumRef<'_> {
+        self.array.value_at(self.start as usize + index)
+    }
+
+    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
+        self.array.value_at_unchecked(self.start as usize + index)
+    }
+
+    fn len(&self) -> usize {
+        self.len()
+    }
+
+    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
+        (*self).iter()
+    }
+}
+
+impl ToText for MapRef<'_> {
+    // This function will be invoked when pgwire prints a list value in string.
+    // Refer to PostgreSQL `array_out` or `appendPGArray`.
+    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
+        write!(
+            f,
+            "{{{}}}",
+            self.iter().format_with(",", |datum_ref, f| {
+                let s = datum_ref.to_text();
+                // Never quote null or inner list, but quote empty, verbatim 'null', special
+                // chars and whitespaces.
+                let need_quote = !matches!(datum_ref, None | Some(ScalarRefImpl::Map(_)))
+                    && (s.is_empty()
+                        || s.to_ascii_lowercase() == "null"
+                        || s.contains([
+                            '"', '\\', '{', '}', ',',
+                            // PostgreSQL `array_isspace` includes '\x0B' but rust
+                            // [`char::is_ascii_whitespace`] does not.
+                            ' ', '\t', '\n', '\r', '\x0B', '\x0C',
+                        ]));
+                if need_quote {
+                    f(&"\"")?;
+                    s.chars().try_for_each(|c| {
+                        if c == '"' || c == '\\' {
+                            f(&"\\")?;
+                        }
+                        f(&c)
+                    })?;
+                    f(&"\"")
+                } else {
+                    f(&s)
+                }
+            })
+        )
+    }
+
+    fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
+        match ty {
+            DataType::Map { .. } => self.write(f),
+            _ => unreachable!(),
+        }
+    }
+}
+
+impl<'a> From<&'a MapValue> for MapRef<'a> {
+    fn from(value: &'a MapValue) -> Self {
+        MapRef {
+            array: &value.values,
+            start: 0,
+            end: value.len() as u32,
+        }
+    }
+}
+
+impl From<MapRef<'_>> for MapValue {
+    fn from(value: MapRef<'_>) -> Self {
+        value.to_owned()
+    }
+}
+
+impl MapValue {
+    /// Construct an array from literal string.
+    pub fn from_str(input: &str, data_type: &DataType) -> Result<Self, String> {
+        struct Parser<'a> {
+            input: &'a str,
+            data_type: &'a DataType,
+        }
+
+        impl Parser<'_> {
+            /// Parse a datum.
+            fn parse(&mut self) -> Result<Datum, String> {
+                self.skip_whitespace();
+                if self.data_type.is_array() {
+                    if self.try_parse_null() {
+                        return Ok(None);
+                    }
+                    Ok(Some(self.parse_array()?.into()))
+                } else {
+                    self.parse_value()
+                }
+            }
+
+            /// Parse an array.
+            fn parse_array(&mut self) -> Result<MapValue, String> {
+                self.skip_whitespace();
+                if !self.try_consume('{') {
+                    return Err("Array value must start with \"{\"".to_string());
+                }
+                self.skip_whitespace();
+                if self.try_consume('}') {
+                    return Ok(MapValue::empty(self.data_type.as_list()));
+                }
+                let mut builder = ArrayBuilderImpl::with_type(0, self.data_type.as_list().clone());
+                loop {
+                    let mut parser = Self {
+                        input: self.input,
+                        data_type: self.data_type.as_list(),
+                    };
+                    builder.append(parser.parse()?);
+                    self.input = parser.input;
+
+                    // expect ',' or '}'
+                    self.skip_whitespace();
+                    match self.peek() {
+                        Some(',') => {
+                            self.try_consume(',');
+                        }
+                        Some('}') => {
+                            self.try_consume('}');
+                            break;
+                        }
+                        None => return Err(Self::eoi()),
+                        _ => return Err("Unexpected array element.".to_string()),
+                    }
+                }
+                Ok(MapValue::new(builder.finish()))
+            }
+
+            /// Parse a non-array value.
+            fn parse_value(&mut self) -> Result<Datum, String> {
+                if self.peek() == Some('"') {
+                    return Ok(Some(self.parse_quoted()?));
+                }
+                // peek until the next unescaped ',' or '}'
+                let mut chars = self.input.char_indices();
+                let mut has_escape = false;
+                let s = loop {
+                    match chars.next().ok_or_else(Self::eoi)? {
+                        (_, '\\') => {
+                            has_escape = true;
+                            chars.next().ok_or_else(Self::eoi)?;
+                        }
+                        (i, c @ ',' | c @ '}') => {
+                            let s = &self.input[..i];
+                            // consume the value and leave the ',' or '}' for parent
+                            self.input = &self.input[i..];
+
+                            break if has_escape {
+                                Cow::Owned(Self::unescape_trim_end(s))
+                            } else {
+                                let trimmed = s.trim_end();
+                                if trimmed.is_empty() {
+                                    return Err(format!("Unexpected \"{c}\" character."));
+                                }
+                                if trimmed.eq_ignore_ascii_case("null") {
+                                    return Ok(None);
+                                }
+                                Cow::Borrowed(trimmed)
+                            };
+                        }
+                        (_, '{') => return Err("Unexpected \"{\" character.".to_string()),
+                        (_, '"') => return Err("Unexpected array element.".to_string()),
+                        _ => {}
+                    }
+                };
+                Ok(Some(
+                    ScalarImpl::from_text(&s, self.data_type).map_err(|e| e.to_report_string())?,
+                ))
+            }
+
+            /// Parse a double quoted non-array value.
+            fn parse_quoted(&mut self) -> Result<ScalarImpl, String> {
+                assert!(self.try_consume('"'));
+                // peek until the next unescaped '"'
+                let mut chars = self.input.char_indices();
+                let mut has_escape = false;
+                let s = loop {
+                    match chars.next().ok_or_else(Self::eoi)? {
+                        (_, '\\') => {
+                            has_escape = true;
+                            chars.next().ok_or_else(Self::eoi)?;
+                        }
+                        (i, '"') => {
+                            let s = &self.input[..i];
+                            self.input = &self.input[i + 1..];
+                            break if has_escape {
+                                Cow::Owned(Self::unescape(s))
+                            } else {
+                                Cow::Borrowed(s)
+                            };
+                        }
+                        _ => {}
+                    }
+                };
+                ScalarImpl::from_text(&s, self.data_type).map_err(|e| e.to_report_string())
+            }
+
+            /// Unescape a string.
+            fn unescape(s: &str) -> String {
+                let mut unescaped = String::with_capacity(s.len());
+                let mut chars = s.chars();
+                while let Some(mut c) = chars.next() {
+                    if c == '\\' {
+                        c = chars.next().unwrap();
+                    }
+                    unescaped.push(c);
+                }
+                unescaped
+            }
+
+            /// Unescape a string and trim the trailing whitespaces.
+            ///
+            /// Example: `"\  " -> " "`
+            fn unescape_trim_end(s: &str) -> String {
+                let mut unescaped = String::with_capacity(s.len());
+                let mut chars = s.chars();
+                let mut len_after_last_escaped_char = 0;
+                while let Some(mut c) = chars.next() {
+                    if c == '\\' {
+                        c = chars.next().unwrap();
+                        unescaped.push(c);
+                        len_after_last_escaped_char = unescaped.len();
+                    } else {
+                        unescaped.push(c);
+                    }
+                }
+                let l = unescaped[len_after_last_escaped_char..].trim_end().len();
+                unescaped.truncate(len_after_last_escaped_char + l);
+                unescaped
+            }
+
+            /// Consume the next 4 characters if it matches "null".
+            ///
+            /// Note: We don't use this function when parsing non-array values.
+            ///       Because we can't decide whether it is a null value or a string starts with "null".
+            ///       Consider this case: `{null value}` => `["null value"]`
+            fn try_parse_null(&mut self) -> bool {
+                if let Some(s) = self.input.get(..4)
+                    && s.eq_ignore_ascii_case("null")
+                {
+                    let next_char = self.input[4..].chars().next();
+                    match next_char {
+                        None | Some(',' | '}') => {}
+                        Some(c) if c.is_ascii_whitespace() => {}
+                        // following normal characters
+                        _ => return false,
+                    }
+                    self.input = &self.input[4..];
+                    true
+                } else {
+                    false
+                }
+            }
+
+            /// Consume the next character if it matches `c`.
+            fn try_consume(&mut self, c: char) -> bool {
+                if self.peek() == Some(c) {
+                    self.input = &self.input[c.len_utf8()..];
+                    true
+                } else {
+                    false
+                }
+            }
+
+            /// Expect end of input.
+            fn expect_end(&mut self) -> Result<(), String> {
+                self.skip_whitespace();
+                match self.peek() {
+                    Some(_) => Err("Junk after closing right brace.".to_string()),
+                    None => Ok(()),
+                }
+            }
+
+            /// Skip whitespaces.
+            fn skip_whitespace(&mut self) {
+                self.input = match self
+                    .input
+                    .char_indices()
+                    .find(|(_, c)| !c.is_ascii_whitespace())
+                {
+                    Some((i, _)) => &self.input[i..],
+                    None => "",
+                };
+            }
+
+            /// Peek the next character.
+            fn peek(&self) -> Option<char> {
+                self.input.chars().next()
+            }
+
+            /// Return the error message for unexpected end of input.
+            fn eoi() -> String {
+                "Unexpected end of input.".into()
+            }
+        }
+
+        let mut parser = Parser { input, data_type };
+        let array = parser.parse_array()?;
+        parser.expect_end()?;
+        Ok(array)
+    }
+}
diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs
index f2ae95aa71efb..97eb222cc67c9 100644
--- a/src/common/src/array/mod.rs
+++ b/src/common/src/array/mod.rs
@@ -26,6 +26,7 @@ pub mod interval_array;
 mod iterator;
 mod jsonb_array;
 pub mod list_array;
+mod map_array;
 mod num256_array;
 mod primitive_array;
 mod proto_reader;
@@ -54,6 +55,7 @@ pub use interval_array::{IntervalArray, IntervalArrayBuilder};
 pub use iterator::ArrayIterator;
 pub use jsonb_array::{JsonbArray, JsonbArrayBuilder};
 pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
+pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue};
 use paste::paste;
 pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
 use risingwave_common_estimate_size::EstimateSize;
@@ -332,6 +334,8 @@ macro_rules! array_impl_enum {
 
 for_all_array_variants! { array_impl_enum }
 
+// XXX: We can merge the From<XxArray> impl into impl_convert
+
 impl<T: PrimitiveArrayItemType> From<PrimitiveArray<T>> for ArrayImpl {
     fn from(arr: PrimitiveArray<T>) -> Self {
         T::erase_array_type(arr)
@@ -380,6 +384,12 @@ impl From<BytesArray> for ArrayImpl {
     }
 }
 
+impl From<MapArray> for ArrayImpl {
+    fn from(arr: MapArray) -> Self {
+        Self::Map(arr)
+    }
+}
+
 /// `impl_convert` implements several conversions for `Array` and `ArrayBuilder`.
 /// * `ArrayImpl -> &Array` with `impl.as_int16()`.
 /// * `ArrayImpl -> Array` with `impl.into_int16()`.
diff --git a/src/common/src/array/proto_reader.rs b/src/common/src/array/proto_reader.rs
index 7c3b05437770c..a5e83ab3521ba 100644
--- a/src/common/src/array/proto_reader.rs
+++ b/src/common/src/array/proto_reader.rs
@@ -54,6 +54,7 @@ impl ArrayImpl {
                 read_string_array::<BytesArrayBuilder, BytesValueReader>(array, cardinality)?
             }
             PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
+            PbArrayType::Map => MapArray::from_protobuf(array)?,
         };
         Ok(array)
     }
diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs
index e9f7e83ac9146..d08ed7c8c317c 100644
--- a/src/common/src/hash/key.rs
+++ b/src/common/src/hash/key.rs
@@ -33,7 +33,7 @@ use risingwave_common_estimate_size::EstimateSize;
 use smallbitset::Set64;
 use static_assertions::const_assert_eq;
 
-use crate::array::{ListValue, StructValue};
+use crate::array::{ListValue, MapValue, StructValue};
 use crate::types::{
     DataType, Date, Decimal, Int256, Int256Ref, JsonbVal, Scalar, ScalarRef, ScalarRefImpl, Serial,
     Time, Timestamp, Timestamptz, F32, F64,
@@ -627,6 +627,7 @@ impl_value_encoding_hash_key_serde!(JsonbVal);
 // use the memcmp encoding for safety.
 impl_memcmp_encoding_hash_key_serde!(StructValue);
 impl_memcmp_encoding_hash_key_serde!(ListValue);
+impl_memcmp_encoding_hash_key_serde!(MapValue);
 
 #[cfg(test)]
 mod tests {
diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs
index b7c1d3630b9b7..338539ee5b89c 100644
--- a/src/common/src/test_utils/rand_array.rs
+++ b/src/common/src/test_utils/rand_array.rs
@@ -24,7 +24,7 @@ use rand::prelude::Distribution;
 use rand::rngs::SmallRng;
 use rand::{Rng, SeedableRng};
 
-use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, StructValue};
+use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, MapValue, StructValue};
 use crate::types::{
     Date, Decimal, Int256, Interval, JsonbVal, NativeType, Scalar, Serial, Time, Timestamp,
     Timestamptz,
@@ -151,6 +151,12 @@ impl RandValue for ListValue {
     }
 }
 
+impl RandValue for MapValue {
+    fn rand_value<R: Rng>(rand: &mut R) -> Self {
+        todo!()
+    }
+}
+
 pub fn rand_array<A, R>(rand: &mut R, size: usize, null_ratio: f64) -> A
 where
     A: Array,
diff --git a/src/common/src/test_utils/rand_chunk.rs b/src/common/src/test_utils/rand_chunk.rs
index 3e537fd9b6a49..9c604b6205cc3 100644
--- a/src/common/src/test_utils/rand_chunk.rs
+++ b/src/common/src/test_utils/rand_chunk.rs
@@ -43,10 +43,11 @@ pub fn gen_chunk(data_types: &[DataType], size: usize, seed: u64, null_ratio: f6
             }
             DataType::Interval => seed_rand_array_ref::<IntervalArray>(size, seed, null_ratio),
             DataType::Int256 => seed_rand_array_ref::<Int256Array>(size, seed, null_ratio),
-            DataType::Struct(_) | DataType::Bytea | DataType::Jsonb => {
-                todo!()
-            }
-            DataType::List(_) => {
+            DataType::Struct(_)
+            | DataType::Bytea
+            | DataType::Jsonb
+            | DataType::List(_)
+            | DataType::Map(_) => {
                 todo!()
             }
         });
diff --git a/src/common/src/types/macros.rs b/src/common/src/types/macros.rs
index 520e4ab8f45ee..1dd29156dd651 100644
--- a/src/common/src/types/macros.rs
+++ b/src/common/src/types/macros.rs
@@ -58,6 +58,7 @@ macro_rules! for_all_variants {
             { Serial,       Serial,       serial,       $crate::types::Serial,      $crate::types::Serial,              $crate::array::SerialArray,         $crate::array::SerialArrayBuilder       },
             { Struct,       Struct,       struct,       $crate::types::StructValue, $crate::types::StructRef<'scalar>,  $crate::array::StructArray,         $crate::array::StructArrayBuilder       },
             { List,         List,         list,         $crate::types::ListValue,   $crate::types::ListRef<'scalar>,    $crate::array::ListArray,           $crate::array::ListArrayBuilder         },
+            { Map,          Map,          map,          $crate::types::MapValue,    $crate::types::MapRef<'scalar>,     $crate::array::MapArray,            $crate::array::MapArrayBuilder         },
             { Bytea,        Bytea,        bytea,        Box<[u8]>,                  &'scalar [u8],                      $crate::array::BytesArray,          $crate::array::BytesArrayBuilder        }
         }
     };
diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs
index ed424b4f4956c..c2cfb86fd9f6f 100644
--- a/src/common/src/types/mod.rs
+++ b/src/common/src/types/mod.rs
@@ -37,7 +37,8 @@ use thiserror_ext::AsReport;
 use crate::array::{
     ArrayBuilderImpl, ArrayError, ArrayResult, PrimitiveArrayItemType, NULL_VAL_FOR_HASH,
 };
-pub use crate::array::{ListRef, ListValue, StructRef, StructValue};
+// Complex type's value is based on the array
+pub use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
 use crate::cast::{str_to_bool, str_to_bytea};
 use crate::error::BoxedError;
 use crate::{
@@ -166,6 +167,36 @@ pub enum DataType {
     #[display("rw_int256")]
     #[from_str(regex = "(?i)^rw_int256$")]
     Int256,
+    // FIXME: what is the syntax for map?
+    #[display("map{0}")]
+    #[from_str(regex = "(?i)^map(?P<0>.+)$")]
+    Map(MapType),
+}
+
+pub use map_type::MapType;
+mod map_type {
+    use std::fmt::Formatter;
+
+    use anyhow::*;
+
+    use super::*;
+    // TODO: check the trait impls
+    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+    pub struct MapType;
+
+    impl FromStr for MapType {
+        type Err = anyhow::Error;
+
+        fn from_str(s: &str) -> Result<Self, Self::Err> {
+            todo!()
+        }
+    }
+
+    impl std::fmt::Display for MapType {
+        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+            todo!()
+        }
+    }
 }
 
 impl std::str::FromStr for Box<DataType> {
@@ -200,7 +231,7 @@ impl TryFrom<DataTypeName> for DataType {
             DataTypeName::Time => Ok(DataType::Time),
             DataTypeName::Interval => Ok(DataType::Interval),
             DataTypeName::Jsonb => Ok(DataType::Jsonb),
-            DataTypeName::Struct | DataTypeName::List => {
+            DataTypeName::Struct | DataTypeName::List | DataTypeName::Map => {
                 Err("Functions returning struct or list can not be inferred. Please use `FunctionCall::new_unchecked`.")
             }
         }
@@ -236,6 +267,11 @@ impl From<&PbDataType> for DataType {
                 // The first (and only) item is the list element type.
                 Box::new((&proto.field_type[0]).into()),
             ),
+            PbTypeName::Map => DataType::Map(
+                // The first (and only) item is the list element type.
+                // Box::new((&proto.field_type[0]).into()),
+                todo!(),
+            ),
             PbTypeName::Int256 => DataType::Int256,
         }
     }
@@ -263,6 +299,7 @@ impl From<DataTypeName> for PbTypeName {
             DataTypeName::Struct => PbTypeName::Struct,
             DataTypeName::List => PbTypeName::List,
             DataTypeName::Int256 => PbTypeName::Int256,
+            DataTypeName::Map => PbTypeName::Map,
         }
     }
 }
@@ -324,6 +361,10 @@ impl DataType {
             DataType::List(datatype) => {
                 pb.field_type = vec![datatype.to_protobuf()];
             }
+            DataType::Map(datatype) => {
+                todo!()
+                // pb.field_type = vec![datatype.to_protobuf()];
+            }
             DataType::Boolean
             | DataType::Int16
             | DataType::Int32
@@ -447,6 +488,10 @@ impl From<DataType> for PbDataType {
 mod private {
     use super::*;
 
+    // Note: put pub trait inside a private mod just makes the name private,
+    // The trait methods will still be publicly available...
+    // a.k.a. ["Voldemort type"](https://rust-lang.github.io/rfcs/2145-type-privacy.html#lint-3-voldemort-types-its-reachable-but-i-cant-name-it)
+
     /// Common trait bounds of scalar and scalar reference types.
     ///
     /// NOTE(rc): `Hash` is not in the trait bound list, it's implemented as [`ScalarRef::hash_scalar`].
@@ -610,7 +655,7 @@ macro_rules! impl_self_as_scalar_ref {
         )*
     };
 }
-impl_self_as_scalar_ref! { &str, &[u8], Int256Ref<'_>, JsonbRef<'_>, ListRef<'_>, StructRef<'_>, ScalarRefImpl<'_> }
+impl_self_as_scalar_ref! { &str, &[u8], Int256Ref<'_>, JsonbRef<'_>, ListRef<'_>, StructRef<'_>, ScalarRefImpl<'_>, MapRef<'_> }
 
 /// `for_all_native_types` includes all native variants of our scalar types.
 ///
@@ -831,7 +876,7 @@ impl ScalarImpl {
                     .ok_or_else(|| "invalid value of Jsonb".to_string())?,
             ),
             DataType::Int256 => Self::Int256(Int256::from_binary(bytes)?),
-            DataType::Struct(_) | DataType::List(_) => {
+            DataType::Struct(_) | DataType::List(_)| DataType::Map(_) => {
                 return Err(format!("unsupported data type: {}", data_type).into());
             }
         };
@@ -864,6 +909,9 @@ impl ScalarImpl {
             DataType::Struct(_) => StructValue::from_str(s, data_type)?.into(),
             DataType::Jsonb => JsonbVal::from_str(s)?.into(),
             DataType::Bytea => str_to_bytea(s)?.into(),
+            DataType::Map(_) => {
+                todo!()
+            }
         })
     }
 }
@@ -931,6 +979,7 @@ impl ScalarRefImpl<'_> {
     }
 
     /// Serialize the scalar.
+    /// TODO: use serde?
     pub fn serialize(
         &self,
         ser: &mut memcomparable::Serializer<impl BufMut>,
@@ -961,6 +1010,7 @@ impl ScalarRefImpl<'_> {
             Self::Jsonb(v) => v.memcmp_serialize(ser)?,
             Self::Struct(v) => v.memcmp_serialize(ser)?,
             Self::List(v) => v.memcmp_serialize(ser)?,
+            Self::Map(v) => v.memcmp_serialize(ser)?,
         };
         Ok(())
     }
@@ -1015,6 +1065,7 @@ impl ScalarImpl {
             Ty::Jsonb => Self::Jsonb(JsonbVal::memcmp_deserialize(de)?),
             Ty::Struct(t) => StructValue::memcmp_deserialize(t.types(), de)?.to_scalar_value(),
             Ty::List(t) => ListValue::memcmp_deserialize(t, de)?.to_scalar_value(),
+            Ty::Map(_) => todo!(),
         })
     }
 
diff --git a/src/common/src/types/postgres_type.rs b/src/common/src/types/postgres_type.rs
index ae147e9c9660e..6a1e910ea4f59 100644
--- a/src/common/src/types/postgres_type.rs
+++ b/src/common/src/types/postgres_type.rs
@@ -64,6 +64,7 @@ impl DataType {
                     DataType::Serial => 8,
                     DataType::Int256 => -1,
                     DataType::List(_) | DataType::Struct(_) => -1,
+                    DataType::Map(_) =>  todo!()
                 }
             }
         }
@@ -96,6 +97,7 @@ impl DataType {
         for_all_base_types! { impl_from_oid }
     }
 
+    /// Refer to [`Self::from_oid`]
     pub fn to_oid(&self) -> i32 {
         macro_rules! impl_to_oid {
             ($( { $enum:ident | $oid:literal | $oid_array:literal | $name:ident | $input:ident | $len:literal } )*) => {
@@ -111,11 +113,13 @@ impl DataType {
                         DataType::Serial => 1016,
                         DataType::Struct(_) => -1,
                         DataType::List { .. } => unreachable!("Never reach here!"),
+                        DataType::Map(_) =>  todo!(),
                     }
                     DataType::Serial => 20,
                     DataType::Int256 => 1301,
                     // TODO: Support to give a new oid for custom struct type. #9434
                     DataType::Struct(_) => 1043,
+                    DataType::Map(_) =>  todo!(),
                 }
             }
         }
@@ -133,6 +137,7 @@ impl DataType {
                     DataType::List(_) => "list",
                     DataType::Serial => "serial",
                     DataType::Int256 => "rw_int256",
+                    DataType::Map(_) => "map",
                 }
             }
         }
diff --git a/src/common/src/types/scalar_impl.rs b/src/common/src/types/scalar_impl.rs
index d222473fa6bbb..2e7e20510f6e9 100644
--- a/src/common/src/types/scalar_impl.rs
+++ b/src/common/src/types/scalar_impl.rs
@@ -91,6 +91,14 @@ impl Scalar for ListValue {
     }
 }
 
+impl Scalar for MapValue {
+    type ScalarRefType<'a> = MapRef<'a>;
+
+    fn as_scalar_ref(&self) -> MapRef<'_> {
+        self.into()
+    }
+}
+
 /// Implement `ScalarRef` for `Box<str>`.
 /// `Box<str>` could be converted to `&str`.
 impl<'a> ScalarRef<'a> for &'a str {
@@ -316,6 +324,18 @@ impl<'a> ScalarRef<'a> for ListRef<'a> {
     }
 }
 
+impl<'a> ScalarRef<'a> for MapRef<'a> {
+    type ScalarType = MapValue;
+
+    fn to_owned_scalar(&self) -> MapValue {
+        (*self).into()
+    }
+
+    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.hash_scalar_inner(state)
+    }
+}
+
 impl ScalarImpl {
     pub fn get_ident(&self) -> &'static str {
         dispatch_scalar_variants!(self, [I = VARIANT_NAME], { I })
diff --git a/src/common/src/types/to_binary.rs b/src/common/src/types/to_binary.rs
index 56eea301f3f61..da7f75f0a2a3f 100644
--- a/src/common/src/types/to_binary.rs
+++ b/src/common/src/types/to_binary.rs
@@ -102,6 +102,7 @@ impl ToBinary for ScalarRefImpl<'_> {
                 issue = 7949,
                 "the pgwire extended-mode encoding for {ty} is unsupported"
             ),
+            ScalarRefImpl::Map(_) => todo!(),
         }
     }
 }
diff --git a/src/common/src/types/to_sql.rs b/src/common/src/types/to_sql.rs
index 3ece8a574c450..57aab11daf4d7 100644
--- a/src/common/src/types/to_sql.rs
+++ b/src/common/src/types/to_sql.rs
@@ -46,6 +46,7 @@ impl ToSql for ScalarImpl {
             ScalarImpl::Int256(_) | ScalarImpl::Struct(_) | ScalarImpl::List(_) => {
                 bail_not_implemented!("the postgres encoding for {ty} is unsupported")
             }
+            ScalarImpl::Map(_) => todo!(),
         }
     }
 
diff --git a/src/common/src/util/memcmp_encoding.rs b/src/common/src/util/memcmp_encoding.rs
index 5a5ad598093af..c58f06f908520 100644
--- a/src/common/src/util/memcmp_encoding.rs
+++ b/src/common/src/util/memcmp_encoding.rs
@@ -170,6 +170,7 @@ fn calculate_encoded_size_inner(
             DataType::Varchar => deserializer.skip_bytes()?,
             DataType::Bytea => deserializer.skip_bytes()?,
             DataType::Int256 => Int256::MEMCMP_ENCODED_SIZE,
+            DataType::Map(_) => todo!(),
         };
 
         // consume offset of fixed_type
diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs
index a3da88911ad9a..3405a4859e3b8 100644
--- a/src/common/src/util/value_encoding/mod.rs
+++ b/src/common/src/util/value_encoding/mod.rs
@@ -13,7 +13,8 @@
 // limitations under the License.
 
 //! Value encoding is an encoding format which converts the data into a binary form (not
-//! memcomparable).
+//! memcomparable, i.e., Key encoding).
+
 use bytes::{Buf, BufMut};
 use chrono::{Datelike, Timelike};
 use either::{for_both, Either};
@@ -226,6 +227,7 @@ fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
         ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
         ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
         ScalarRefImpl::List(v) => serialize_list(v, buf),
+        ScalarRefImpl::Map(_) => todo!(),
     }
 }
 
@@ -251,6 +253,7 @@ fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
         ScalarRefImpl::Jsonb(v) => v.capacity(),
         ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
         ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
+        ScalarRefImpl::Map(_) => todo!(),
     }
 }
 
@@ -354,6 +357,7 @@ fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
         DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
         DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
         DataType::List(item_type) => deserialize_list(item_type, data)?,
+        DataType::Map(_) => todo!(),
     })
 }
 
diff --git a/src/expr/macro/src/types.rs b/src/expr/macro/src/types.rs
index f2219a1c34bd6..88dfe400ffe5c 100644
--- a/src/expr/macro/src/types.rs
+++ b/src/expr/macro/src/types.rs
@@ -14,6 +14,7 @@
 
 //! This module provides utility functions for SQL data type conversion and manipulation.
 
+// TODO:: add map here
 //  name        data type   array type          owned type      ref type            primitive
 const TYPE_MATRIX: &str = "
     boolean     Boolean     BoolArray           bool            bool                _