Skip to content

Commit

Permalink
feat: impl new and scan for index
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Feb 4, 2024
1 parent cce3ba0 commit d5952c8
Showing 1 changed file with 88 additions and 19 deletions.
107 changes: 88 additions & 19 deletions src/mito2/src/memtable/merge_tree/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
use datatypes::arrow::compute;
Expand All @@ -35,18 +35,41 @@ pub(crate) struct IndexConfig {
pub(crate) max_keys_per_shard: usize,
}

impl Default for IndexConfig {
fn default() -> Self {
Self {
max_keys_per_shard: 4096,
}
}
}

/// Primary key index.
pub(crate) struct KeyIndex {
config: IndexConfig,
// TODO(yingwen): 1. Support multiple shard. 2. Lock
shard: Shard,
// TODO(yingwen): 1. Support multiple shard.
shard: RwLock<Shard>,
}

impl KeyIndex {
pub(crate) fn new(config: IndexConfig) -> KeyIndex {
KeyIndex {
config,
shard: RwLock::new(Shard::new(0)),
}
}

pub(crate) fn add_primary_key(&mut self, key: &[u8]) -> Result<PkId> {
let pkid = self.shard.try_add_primary_key(&self.config, key)?;
let mut shard = self.shard.write().unwrap();
let pkid = shard.try_add_primary_key(&self.config, key)?;
// TODO(yingwen): Switch shard if current shard is full.
Ok(pkid.unwrap())
Ok(pkid.expect("shard is full"))
}

pub(crate) fn scan_index(&self) -> Result<BoxedIndexReader> {
let shard = self.shard.read().unwrap();
let reader = shard.scan_shard()?;

Ok(Box::new(reader))
}
}

Expand All @@ -57,11 +80,20 @@ impl KeyIndex {
struct Shard {
shard_id: ShardId,
key_buffer: KeyBuffer,
dict_blocks: Vec<DictBlock>,
dict_blocks: Vec<DictBlockRef>,
num_keys: usize,
}

impl Shard {
fn new(shard_id: ShardId) -> Shard {
Shard {
shard_id,
key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()),
dict_blocks: Vec::new(),
num_keys: 0,
}
}

fn try_add_primary_key(&mut self, config: &IndexConfig, key: &[u8]) -> Result<Option<PkId>> {
// The shard is full.
if self.num_keys >= config.max_keys_per_shard {
Expand All @@ -71,7 +103,7 @@ impl Shard {
if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
// The write buffer is full.
let dict_block = self.key_buffer.finish()?;
self.dict_blocks.push(dict_block);
self.dict_blocks.push(Arc::new(dict_block));
}

// Safety: we check the buffer length.
Expand All @@ -83,6 +115,17 @@ impl Shard {
pk_index,
}))
}

fn scan_shard(&self) -> Result<ReaderMerger> {
let block = self.key_buffer.finish_cloned()?;
let mut readers = Vec::with_capacity(self.dict_blocks.len() + 1);
readers.push(DictBlockReader::new(Arc::new(block)));
for block in &self.dict_blocks {
readers.push(DictBlockReader::new(block.clone()));
}

Ok(ReaderMerger::from_readers(readers))
}
}

// TODO(yingwen): Bench using custom container for binary and ids so we can
Expand All @@ -92,13 +135,21 @@ impl Shard {
/// Now it doesn't support searching index by key. The memtable should use another
/// cache to map primary key to its index.
struct KeyBuffer {
// TODO(yingwen): Maybe use BTreeMap as key builder.
// We use arrow's binary builder as out default binary builder
// is LargeBinaryBuilder
primary_key_builder: BinaryBuilder,
next_pk_index: usize,
}

impl KeyBuffer {
fn new(item_capacity: usize) -> KeyBuffer {
KeyBuffer {
primary_key_builder: BinaryBuilder::with_capacity(item_capacity, 0),
next_pk_index: 0,
}
}

/// Pushes a new key and returns its pk index.
///
/// # Panics
Expand Down Expand Up @@ -213,6 +264,32 @@ impl DictBlock {
}
}

/// Reader to scan index keys.
pub(crate) trait IndexReader: Send {
/// Returns whether the reader is valid.
fn is_valid(&self) -> bool;

/// Returns current key.
///
/// # Panics
/// Panics if the reader is invalid.
fn current_key(&self) -> &[u8];

/// Returns current pk index.
///
/// # Panics
/// Panics if the reader is invalid.
fn current_pk_index(&self) -> PkIndex;

/// Advance the reader.
///
/// # Panics
/// Panics if the reader is invalid.
fn next(&mut self);
}

pub(crate) type BoxedIndexReader = Box<dyn IndexReader>;

struct DictBlockReader {
block: DictBlockRef,
current: usize,
Expand All @@ -222,31 +299,21 @@ impl DictBlockReader {
fn new(block: DictBlockRef) -> Self {
Self { block, current: 0 }
}
}

impl IndexReader for DictBlockReader {
fn is_valid(&self) -> bool {
self.current < self.block.len()
}

/// Returns current key.
///
/// # Panics
/// Panics if the reader is invalid.
fn current_key(&self) -> &[u8] {
self.block.key_at(self.current)
}

/// Returns current pk index.
///
/// # Panics
/// Panics if the reader is invalid.
fn current_pk_index(&self) -> PkIndex {
self.block.pk_index_at(self.current)
}

/// Advance the reader.
///
/// # Panics
/// Panics if the reader is invalid.
fn next(&mut self) {
assert!(self.is_valid());
self.current += 1;
Expand Down Expand Up @@ -299,7 +366,9 @@ impl ReaderMerger {

ReaderMerger { heap }
}
}

impl IndexReader for ReaderMerger {
fn is_valid(&self) -> bool {
!self.heap.is_empty()
}
Expand Down

0 comments on commit d5952c8

Please sign in to comment.