Skip to content

Commit

Permalink
feat: Implement write buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jan 31, 2024
1 parent 48791f5 commit 4fc9633
Showing 1 changed file with 72 additions and 59 deletions.
131 changes: 72 additions & 59 deletions src/mito2/src/memtable/merge_tree/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

//! Primary key index of the merge tree.
use datatypes::arrow::array::{
ArrayBuilder, BinaryArray, BinaryBuilder, UInt16Array, UInt16Builder,
};
use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
use datatypes::arrow::compute;
use snafu::ResultExt;

Expand All @@ -25,7 +23,7 @@ use crate::memtable::merge_tree::{PkId, PkIndex, ShardId};

// TODO(yingwen): Consider using byte size to manage block.
/// Maximum keys in a block. Should be power of 2.
const MAX_KEYS_PER_BLOCK: usize = 256;
const MAX_KEYS_PER_BLOCK: u16 = 256;

/// Config for the index.
pub(crate) struct IndexConfig {
Expand Down Expand Up @@ -60,12 +58,13 @@ impl MutableShard {
return Ok(None);
}

if self.write_buffer.len() >= MAX_KEYS_PER_BLOCK {
if self.write_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
// The write buffer is full.
let dict_block = self.write_buffer.finish_dict_block()?;
let dict_block = self.write_buffer.finish()?;
self.dict_blocks.push(dict_block);
}

// Safety: we check the buffer length.
let pk_index = self.write_buffer.push_key(key);

Ok(Some(PkId {
Expand All @@ -77,95 +76,109 @@ impl MutableShard {

// TODO(yingwen): Bench using custom container for binary and ids so we can
// sort the buffer in place and reuse memory.
struct DictBlockBuilder {
/// Buffer to store unsorted primary keys.
///
/// Now it doesn't support searching index by key. The memtable should use another
/// cache to map primary key to its index.
struct WriteBuffer {
// We use arrow's binary builder as out default binary builder
// is LargeBinaryBuilder
primary_key: BinaryBuilder,
// TODO(yingwen): We don't need to store index in the builder, we only need
// to store start index.
pk_index: UInt16Builder,
primary_key_builder: BinaryBuilder,
next_pk_index: usize,
}

impl DictBlockBuilder {
fn push_key(&mut self, key: &[u8], index: PkIndex) {
self.primary_key.append_value(key);
self.pk_index.append_value(index);
}

/// Builds and sorts the key dict.
fn finish(&mut self) -> Result<DictBlock> {
// TODO(yingwen): We can check whether keys are already sorted first. But
// we might need some benchmarks.
let primary_key = self.primary_key.finish();
let pk_index = self.pk_index.finish();

DictBlock::try_new(primary_key, pk_index)
}

fn finish_cloned(&self) -> Result<DictBlock> {
let primary_key = self.primary_key.finish_cloned();
let pk_index = self.pk_index.finish_cloned();
impl WriteBuffer {
/// Pushes a new key and returns its pk index.
///
/// # Panics
/// Panics if the [PkIndex] type cannot represent the index.
fn push_key(&mut self, key: &[u8]) -> PkIndex {
let pk_index = self.next_pk_index.try_into().unwrap();
self.next_pk_index += 1;
self.primary_key_builder.append_value(key);

DictBlock::try_new(primary_key, pk_index)
pk_index
}

fn len(&self) -> usize {
self.primary_key.len()
self.primary_key_builder.len()
}
}

struct WriteBuffer {
builder: DictBlockBuilder,
next_index: usize,
}

impl WriteBuffer {
/// Push a new key.
/// Gets the primary key by its index.
///
/// # Panics
/// Panics if the index will overflow.
fn push_key(&mut self, key: &[u8]) -> PkIndex {
let pk_index = self.next_index.try_into().unwrap();
self.next_index += 1;
self.builder.push_key(key, pk_index);

pk_index
/// Panics if the index is invalid.
fn get_key(&self, index: PkIndex) -> &[u8] {
let values = self.primary_key_builder.values_slice();
let offsets = self.primary_key_builder.offsets_slice();
// Casting index to usize is safe.
let start = offsets[index as usize];
let end = offsets[index as usize + 1];

// We know there is no null in the builder so we don't check validity.
// The builder offset should be positive.
&values[start as usize..end as usize]
}

fn len(&self) -> usize {
self.builder.len()
fn finish(&mut self) -> Result<DictBlock> {
// TODO(yingwen): We can check whether keys are already sorted first. But
// we might need some benchmarks.
let primary_key = self.primary_key_builder.finish();

DictBlock::try_from_unsorted(primary_key)
}

fn finish_dict_block(&mut self) -> Result<DictBlock> {
self.builder.finish()
fn finish_cloned(&self) -> Result<DictBlock> {
let primary_key = self.primary_key_builder.finish_cloned();

DictBlock::try_from_unsorted(primary_key)
}
}

struct DictBlock {
/// Sorted primary key buffer.
primary_key: BinaryArray,
pk_index: UInt16Array,
/// PkIndex sorted by primary keys.
ordered_pk_index: Vec<PkIndex>,
/// Sort weight of each PkIndex. It also maps the PkIndex to the position
/// of the primary key in the sorted key buffer.
index_weight: Vec<u16>,
}

impl DictBlock {
fn try_new(primary_key: BinaryArray, pk_index: UInt16Array) -> Result<Self> {
fn try_from_unsorted(primary_key: BinaryArray) -> Result<Self> {
assert!(primary_key.len() <= PkIndex::MAX.into());

// Sort by primary key.
let indices =
compute::sort_to_indices(&primary_key, None, None).context(ComputeArrowSnafu)?;
let primary_key = compute::take(&primary_key, &indices, None).context(ComputeArrowSnafu)?;
let pk_index = compute::take(&pk_index, &indices, None).context(ComputeArrowSnafu)?;

// Weight of each pk index. We have check the length of the primary key.
let index_weight: Vec<_> = indices.values().iter().map(|idx| *idx as u16).collect();

let mut ordered_pk_index = vec![0; primary_key.len()];
for (pk_index, dest_pos) in index_weight.iter().enumerate() {
debug_assert!(pk_index <= PkIndex::MAX.into());

ordered_pk_index[*dest_pos as usize] = pk_index as PkIndex;
}

let dict = DictBlock {
primary_key: primary_key
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.clone(),
pk_index: pk_index
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
.clone(),
ordered_pk_index,
index_weight,
};
Ok(dict)
}

fn get_key(&self, index: PkIndex) -> &[u8] {
// Casting index to usize is safe.
let pos = self.index_weight[index as usize];
self.primary_key.value(pos as usize)
}
}

0 comments on commit 4fc9633

Please sign in to comment.