Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write data to the tree #9

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const VECTOR_TYPE: &str = "vector";
const PAGE_TYPE: &str = "page";
// Metrics type key for files on the local store.
const FILE_TYPE: &str = "file";
/// Metrics type key for pkid index in the memtable.
pub(crate) const PK_ID_TYPE: &str = "pkid";

/// Manages cached data for the engine.
///
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Memtables are write buffers for regions.

pub mod key_values;
#[allow(unused)]
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;
Expand Down
48 changes: 39 additions & 9 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
//! - Flushes mutable parts to immutable parts
//! - Merges small immutable parts into a big immutable part

#[allow(unused)]
mod data;
#[allow(unused)]
mod index;
// TODO(yingwen): Remove this mod.
mod mutable;
mod tree;

use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -45,15 +45,30 @@ pub(crate) type ShardId = u32;
/// Index of a primary key in a shard.
pub(crate) type PkIndex = u16;
/// Id of a primary key.
#[allow(unused)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct PkId {
pub(crate) shard_id: ShardId,
pub(crate) pk_index: PkIndex,
}

/// Config for the merge tree memtable.
#[derive(Debug, Default, Clone)]
pub struct MergeTreeConfig {}
#[derive(Debug, Clone)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
index_max_keys_per_shard: usize,
/// Max capacity of pk cache size.
pk_cache_size: ReadableSize,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
Self {
// TODO(yingwen): Use 4096 or find a proper value.
index_max_keys_per_shard: 8192,
pk_cache_size: ReadableSize::mb(256),
}
}
}

/// Memtable based on a merge tree.
pub struct MergeTreeMemtable {
Expand Down Expand Up @@ -103,7 +118,8 @@ impl Memtable for MergeTreeMemtable {
fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();

// TODO(yingwen): Freeze the tree.
self.tree.freeze()?;

Ok(())
}

Expand Down Expand Up @@ -135,8 +151,14 @@ impl Memtable for MergeTreeMemtable {
}
}

fn fork(&self, _id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
unimplemented!()
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
let tree = self.tree.fork(metadata.clone());

Arc::new(MergeTreeMemtable::with_tree(
id,
tree,
self.alloc_tracker.write_buffer_manager(),
))
}
}

Expand All @@ -147,10 +169,18 @@ impl MergeTreeMemtable {
metadata: RegionMetadataRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &MergeTreeConfig,
) -> Self {
Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager)
}

fn with_tree(
id: MemtableId,
tree: MergeTree,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
id,
tree: Arc::new(MergeTree::new(metadata, config)),
tree: Arc::new(tree),
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ impl DataBuffer {
pub fn num_rows(&self) -> usize {
self.ts_builder.len()
}

/// Returns whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}

struct DataPartEncoder<'a> {
Expand Down
62 changes: 50 additions & 12 deletions src/mito2/src/memtable/merge_tree/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,21 @@ use crate::memtable::merge_tree::{PkId, PkIndex, ShardId};
const MAX_KEYS_PER_BLOCK: u16 = 256;

/// Config for the index.
#[derive(Debug, Clone)]
pub(crate) struct IndexConfig {
/// Max keys in an index shard.
pub(crate) max_keys_per_shard: usize,
}

impl Default for IndexConfig {
fn default() -> Self {
Self {
// TODO(yingwen): Use 4096 or find a proper value.
max_keys_per_shard: 8192,
}
}
}

/// Primary key index.
pub(crate) struct KeyIndex {
config: IndexConfig,
// TODO(yingwen): 1. Support multiple shard.
shard: RwLock<MutableShard>,
}

pub(crate) type KeyIndexRef = Arc<KeyIndex>;

impl KeyIndex {
pub(crate) fn new(config: IndexConfig) -> KeyIndex {
KeyIndex {
Expand All @@ -59,11 +53,11 @@ impl KeyIndex {
}
}

pub(crate) fn add_primary_key(&self, key: &[u8]) -> Result<PkId> {
pub(crate) fn write_primary_key(&self, key: &[u8]) -> Result<PkId> {
let mut shard = self.shard.write().unwrap();
let pkid = shard.try_add_primary_key(&self.config, key)?;
let pk_id = shard.try_add_primary_key(&self.config, key)?;
// TODO(yingwen): Switch shard if current shard is full.
Ok(pkid.expect("shard is full"))
Ok(pk_id.expect("shard is full"))
}

pub(crate) fn scan_index(&self) -> Result<BoxedIndexReader> {
Expand All @@ -72,6 +66,25 @@ impl KeyIndex {

Ok(Box::new(reader))
}

/// Freezes the index.
pub(crate) fn freeze(&self) -> Result<()> {
let mut shard = self.shard.write().unwrap();
shard.freeze()
}

/// Returns a new index for write.
///
/// Callers must freeze the index first.
pub(crate) fn fork(&self) -> KeyIndex {
let current_shard = self.shard.read().unwrap();
let shard = current_shard.fork();

KeyIndex {
config: self.config.clone(),
shard: RwLock::new(shard),
}
}
}

// TODO(yingwen): Support partition index (partition by a column, e.g. table_id) to
Expand Down Expand Up @@ -127,6 +140,25 @@ impl MutableShard {

Ok(ReaderMerger::from_readers(readers))
}

fn freeze(&mut self) -> Result<()> {
if self.key_buffer.is_empty() {
return Ok(());
}

let dict_block = self.key_buffer.finish()?;
self.dict_blocks.push(Arc::new(dict_block));
Ok(())
}

fn fork(&self) -> MutableShard {
MutableShard {
shard_id: self.shard_id,
key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()),
dict_blocks: self.dict_blocks.clone(),
num_keys: self.num_keys,
}
}
}

// TODO(yingwen): Bench using custom container for binary and ids so we can
Expand Down Expand Up @@ -167,6 +199,10 @@ impl KeyBuffer {
self.primary_key_builder.len()
}

fn is_empty(&self) -> bool {
self.primary_key_builder.is_empty()
}

/// Gets the primary key by its index.
///
/// # Panics
Expand Down Expand Up @@ -393,3 +429,5 @@ impl IndexReader for ReaderMerger {
}
}
}

// TODO(yingwen): Tests
Loading
Loading