Skip to content

Commit

Permalink
map: make it compie
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jul 17, 2024
1 parent 0ce6228 commit 42b515a
Show file tree
Hide file tree
Showing 18 changed files with 1,104 additions and 13 deletions.
16 changes: 14 additions & 2 deletions docs/dev/src/design/data-model-and-encoding.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,34 @@ Composite data types:

- `Struct`: A structure with a list of named, strong-typed fields.
- `List`: A variable-length list of values with same data type.
- `Map`:

`for_all_variants`

## In-Memory Encoding

### Array

> Source files: `common/src/array`
In-memory data is encoded in arrays for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.
- columnar
- array and scalar
- difference between arrow (see ToArrow trait)

In-memory data is encoded in **arrays** for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.

A Data Chunk consists of multiple columns and a visibility array, as is shown in the left subgraph below. The visibility array marks each row as visible or not. This helps filtering some rows while keeping other data arrays unchanged.

A Stream Chunk consists of columns, visibility array and an additional `ops` column, as is shown in the right subgraph below. The `ops` column marks the operation of row, which can be one of `Delete`, `Insert`, `UpdateDelete` and `UpdateInsert`.

![chunk](../images/data-model-and-encoding/chunk.svg)

### Hash Encoding

## On-Disk Encoding

> Source files: `utils/memcomparable`, `utils/value-encoding`
- key encoding: [memcomparable](https://docs.rs/memcomparable/latest/memcomparable/)
- value encoding: `util/value_encoding`

RisingWave stores user data in shared key-value storage called 'Hummock'. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that `NULL` values are omitted.

Expand Down
2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message DataType {
JSONB = 18;
SERIAL = 19;
INT256 = 20;
MAP = 21;
}
TypeName type_name = 1;
// Data length for char.
Expand Down Expand Up @@ -102,6 +103,7 @@ enum ArrayType {
JSONB = 16;
SERIAL = 17;
INT256 = 18;
MAP = 20;
}

message Array {
Expand Down
37 changes: 37 additions & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

use std::fmt::Write;

use arrow_array_iceberg::array;
use arrow_buffer::OffsetBuffer;
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;
Expand Down Expand Up @@ -113,6 +114,7 @@ pub trait ToArrow {
ArrayImpl::Serial(array) => self.serial_to_arrow(array),
ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
}?;
if arrow_array.data_type() != data_type {
arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
Expand Down Expand Up @@ -267,6 +269,27 @@ pub trait ToArrow {
)))
}

#[inline]
fn map_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &MapArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
todo!()
// let arrow_schema::DataType::List(field) = data_type else {
// return Err(ArrayError::to_arrow("Invalid list type"));
// };
// let values = self.to_array(field.data_type(), array.values())?;
// let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
// let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
// Ok(Arc::new(arrow_array::ListArray::new(
// field.clone(),
// offsets,
// values,
// nulls,
// )))
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
Expand Down Expand Up @@ -297,6 +320,7 @@ pub trait ToArrow {
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};
Ok(arrow_schema::Field::new(name, data_type, true))
}
Expand Down Expand Up @@ -413,6 +437,19 @@ pub trait ToArrow {
.try_collect::<_, _, ArrayError>()?,
))
}

#[inline]
fn map_type_to_arrow(&self, elem_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let key = todo!();
let value = todo!();
let struct_ = arrow_schema::DataType::Struct([key, value].into());
Ok(arrow_schema::DataType::Map(
todo!(),
// Arc::new(self.to_arrow_field("item", struct_)?),
sorted,
))
}
}

/// Defines how to convert Arrow arrays to RisingWave arrays.
Expand Down
Loading

0 comments on commit 42b515a

Please sign in to comment.