Skip to content

Commit

Permalink
map: make it compie
Browse files Browse the repository at this point in the history
xxchan committed Jul 16, 2024
1 parent a31225a commit 94d9d83
Showing 18 changed files with 1,104 additions and 13 deletions.
16 changes: 14 additions & 2 deletions docs/dev/src/design/data-model-and-encoding.md
Original file line number Diff line number Diff line change
@@ -25,22 +25,34 @@ Composite data types:

- `Struct`: A structure with a list of named, strong-typed fields.
- `List`: A variable-length list of values with same data type.
- `Map`:

`for_all_variants`

## In-Memory Encoding

### Array

> Source files: `common/src/array`
In-memory data is encoded in arrays for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.
- columnar
- array and scalar
- difference between arrow (see ToArrow trait)

In-memory data is encoded in **arrays** for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.

A Data Chunk consists of multiple columns and a visibility array, as is shown in the left subgraph below. The visibility array marks each row as visible or not. This helps filtering some rows while keeping other data arrays unchanged.

A Stream Chunk consists of columns, visibility array and an additional `ops` column, as is shown in the right subgraph below. The `ops` column marks the operation of row, which can be one of `Delete`, `Insert`, `UpdateDelete` and `UpdateInsert`.

![chunk](../images/data-model-and-encoding/chunk.svg)

### Hash Encoding

## On-Disk Encoding

> Source files: `utils/memcomparable`, `utils/value-encoding`
- key encoding: [memcomparable](https://docs.rs/memcomparable/latest/memcomparable/)
- value encoding: `util/value_encoding`

RisingWave stores user data in shared key-value storage called 'Hummock'. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that `NULL` values are omitted.

2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ message DataType {
JSONB = 18;
SERIAL = 19;
INT256 = 20;
MAP = 21;
}
TypeName type_name = 1;
// Data length for char.
@@ -102,6 +103,7 @@ enum ArrayType {
JSONB = 16;
SERIAL = 17;
INT256 = 18;
MAP = 20;
}

message Array {
37 changes: 37 additions & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@

use std::fmt::Write;

use arrow_array_iceberg::array;
use arrow_buffer::OffsetBuffer;
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;
@@ -113,6 +114,7 @@ pub trait ToArrow {
ArrayImpl::Serial(array) => self.serial_to_arrow(array),
ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
}?;
if arrow_array.data_type() != data_type {
arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
@@ -267,6 +269,27 @@ pub trait ToArrow {
)))
}

#[inline]
fn map_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &MapArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
todo!()
// let arrow_schema::DataType::List(field) = data_type else {
// return Err(ArrayError::to_arrow("Invalid list type"));
// };
// let values = self.to_array(field.data_type(), array.values())?;
// let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
// let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
// Ok(Arc::new(arrow_array::ListArray::new(
// field.clone(),
// offsets,
// values,
// nulls,
// )))
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
@@ -297,6 +320,7 @@ pub trait ToArrow {
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};
Ok(arrow_schema::Field::new(name, data_type, true))
}
@@ -413,6 +437,19 @@ pub trait ToArrow {
.try_collect::<_, _, ArrayError>()?,
))
}

#[inline]
fn map_type_to_arrow(&self, elem_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let key = todo!();
let value = todo!();
let struct_ = arrow_schema::DataType::Struct([key, value].into());
Ok(arrow_schema::DataType::Map(
todo!(),
// Arc::new(self.to_arrow_field("item", struct_)?),
sorted,
))
}
}

/// Defines how to convert Arrow arrays to RisingWave arrays.
Loading

0 comments on commit 94d9d83

Please sign in to comment.