diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index d0707e1bae48..a3be68606899 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -45,6 +45,8 @@ const VECTOR_TYPE: &str = "vector"; const PAGE_TYPE: &str = "page"; // Metrics type key for files on the local store. const FILE_TYPE: &str = "file"; +/// Metrics type key for pkid index in the memtable. +pub(crate) const PK_ID_TYPE: &str = "pkid"; /// Manages cached data for the engine. /// diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 3558a0e6cd3d..97d8e453b1d3 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -15,6 +15,7 @@ //! Memtables are write buffers for regions. pub mod key_values; +#[allow(unused)] pub mod merge_tree; pub mod time_series; pub(crate) mod version; diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 5f06a3cbdbb5..459e489e39e6 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -16,10 +16,9 @@ //! - Flushes mutable parts to immutable parts //! - Merges small immutable parts into a big immutable part -#[allow(unused)] mod data; -#[allow(unused)] mod index; +// TODO(yingwen): Remove this mod. mod mutable; mod tree; @@ -27,6 +26,7 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -45,15 +45,30 @@ pub(crate) type ShardId = u32; /// Index of a primary key in a shard. pub(crate) type PkIndex = u16; /// Id of a primary key. -#[allow(unused)] +#[derive(Debug, Clone, Copy)] pub(crate) struct PkId { pub(crate) shard_id: ShardId, pub(crate) pk_index: PkIndex, } /// Config for the merge tree memtable. -#[derive(Debug, Default, Clone)] -pub struct MergeTreeConfig {} +#[derive(Debug, Clone)] +pub struct MergeTreeConfig { + /// Max keys in an index shard. + index_max_keys_per_shard: usize, + /// Max capacity of pk cache size. + pk_cache_size: ReadableSize, +} + +impl Default for MergeTreeConfig { + fn default() -> Self { + Self { + // TODO(yingwen): Use 4096 or find a proper value. + index_max_keys_per_shard: 8192, + pk_cache_size: ReadableSize::mb(256), + } + } +} /// Memtable based on a merge tree. pub struct MergeTreeMemtable { @@ -103,7 +118,8 @@ impl Memtable for MergeTreeMemtable { fn freeze(&self) -> Result<()> { self.alloc_tracker.done_allocating(); - // TODO(yingwen): Freeze the tree. + self.tree.freeze()?; + Ok(()) } @@ -135,8 +151,14 @@ impl Memtable for MergeTreeMemtable { } } - fn fork(&self, _id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { - unimplemented!() + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + let tree = self.tree.fork(metadata.clone()); + + Arc::new(MergeTreeMemtable::with_tree( + id, + tree, + self.alloc_tracker.write_buffer_manager(), + )) } } @@ -147,10 +169,18 @@ impl MergeTreeMemtable { metadata: RegionMetadataRef, write_buffer_manager: Option, config: &MergeTreeConfig, + ) -> Self { + Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager) + } + + fn with_tree( + id: MemtableId, + tree: MergeTree, + write_buffer_manager: Option, ) -> Self { Self { id, - tree: Arc::new(MergeTree::new(metadata, config)), + tree: Arc::new(tree), alloc_tracker: AllocTracker::new(write_buffer_manager), max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index dd15e513c16c..dc2336cbbef4 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -125,6 +125,11 @@ impl DataBuffer { pub fn num_rows(&self) -> usize { self.ts_builder.len() } + + /// Returns whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } } struct DataPartEncoder<'a> { diff --git a/src/mito2/src/memtable/merge_tree/index.rs b/src/mito2/src/memtable/merge_tree/index.rs index 4dd7ad9101ba..d54321af4671 100644 --- a/src/mito2/src/memtable/merge_tree/index.rs +++ b/src/mito2/src/memtable/merge_tree/index.rs @@ -30,20 +30,12 @@ use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; const MAX_KEYS_PER_BLOCK: u16 = 256; /// Config for the index. +#[derive(Debug, Clone)] pub(crate) struct IndexConfig { /// Max keys in an index shard. pub(crate) max_keys_per_shard: usize, } -impl Default for IndexConfig { - fn default() -> Self { - Self { - // TODO(yingwen): Use 4096 or find a proper value. - max_keys_per_shard: 8192, - } - } -} - /// Primary key index. pub(crate) struct KeyIndex { config: IndexConfig, @@ -51,6 +43,8 @@ pub(crate) struct KeyIndex { shard: RwLock, } +pub(crate) type KeyIndexRef = Arc; + impl KeyIndex { pub(crate) fn new(config: IndexConfig) -> KeyIndex { KeyIndex { @@ -59,11 +53,11 @@ impl KeyIndex { } } - pub(crate) fn add_primary_key(&self, key: &[u8]) -> Result { + pub(crate) fn write_primary_key(&self, key: &[u8]) -> Result { let mut shard = self.shard.write().unwrap(); - let pkid = shard.try_add_primary_key(&self.config, key)?; + let pk_id = shard.try_add_primary_key(&self.config, key)?; // TODO(yingwen): Switch shard if current shard is full. - Ok(pkid.expect("shard is full")) + Ok(pk_id.expect("shard is full")) } pub(crate) fn scan_index(&self) -> Result { @@ -72,6 +66,25 @@ impl KeyIndex { Ok(Box::new(reader)) } + + /// Freezes the index. + pub(crate) fn freeze(&self) -> Result<()> { + let mut shard = self.shard.write().unwrap(); + shard.freeze() + } + + /// Returns a new index for write. + /// + /// Callers must freeze the index first. + pub(crate) fn fork(&self) -> KeyIndex { + let current_shard = self.shard.read().unwrap(); + let shard = current_shard.fork(); + + KeyIndex { + config: self.config.clone(), + shard: RwLock::new(shard), + } + } } // TODO(yingwen): Support partition index (partition by a column, e.g. table_id) to @@ -127,6 +140,25 @@ impl MutableShard { Ok(ReaderMerger::from_readers(readers)) } + + fn freeze(&mut self) -> Result<()> { + if self.key_buffer.is_empty() { + return Ok(()); + } + + let dict_block = self.key_buffer.finish()?; + self.dict_blocks.push(Arc::new(dict_block)); + Ok(()) + } + + fn fork(&self) -> MutableShard { + MutableShard { + shard_id: self.shard_id, + key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()), + dict_blocks: self.dict_blocks.clone(), + num_keys: self.num_keys, + } + } } // TODO(yingwen): Bench using custom container for binary and ids so we can @@ -167,6 +199,10 @@ impl KeyBuffer { self.primary_key_builder.len() } + fn is_empty(&self) -> bool { + self.primary_key_builder.is_empty() + } + /// Gets the primary key by its index. /// /// # Panics @@ -393,3 +429,5 @@ impl IndexReader for ReaderMerger { } } } + +// TODO(yingwen): Tests diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 38829388aafc..25da87ff2435 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -16,24 +16,40 @@ use std::sync::{Arc, RwLock}; +use api::v1::OpType; +use common_time::Timestamp; +use moka::sync::Cache; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::Result; -use crate::memtable::merge_tree::mutable::{MutablePart, WriteMetrics}; -use crate::memtable::merge_tree::MergeTreeConfig; +use crate::cache::PK_ID_TYPE; +use crate::error::{PrimaryKeyLengthMismatchSnafu, Result}; +use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::data::DataBuffer; +use crate::memtable::merge_tree::index::{IndexConfig, KeyIndex, KeyIndexRef}; +use crate::memtable::merge_tree::mutable::WriteMetrics; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::row_converter::{McmpRowCodec, SortField}; +use crate::metrics::CACHE_BYTES; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + +/// Initial capacity for the data buffer. +const DATA_INIT_CAP: usize = 8; /// The merge tree. pub(crate) struct MergeTree { + /// Config of the tree. + config: MergeTreeConfig, /// Metadata of the region. pub(crate) metadata: RegionMetadataRef, /// Primary key codec. row_codec: Arc, - /// Mutable part of the tree. - mutable: RwLock, + // TODO(yingwen): The pk id cache allocates many small objects. We might need some benchmarks to see whether + // it is necessary to use another way to get the id from pk. + pk_id_cache: Option, + parts: RwLock, } pub(crate) type MergeTreeRef = Arc; @@ -47,39 +63,218 @@ impl MergeTree { .map(|c| SortField::new(c.column_schema.data_type.clone())) .collect(), ); + let pk_id_cache = (!metadata.primary_key.is_empty() + && config.pk_cache_size.as_bytes() != 0) + .then(|| new_cache(config.pk_cache_size.as_bytes())); + + let index = (!metadata.primary_key.is_empty()).then(|| { + Arc::new(KeyIndex::new(IndexConfig { + max_keys_per_shard: config.index_max_keys_per_shard, + })) + }); + let data_buffer = DataBuffer::with_capacity(metadata.clone(), DATA_INIT_CAP); + let parts = TreeParts { + immutable: false, + index, + data_buffer, + }; + MergeTree { + config: config.clone(), metadata, row_codec: Arc::new(row_codec), - mutable: RwLock::new(MutablePart::new(config)), + pk_id_cache, + parts: RwLock::new(parts), } } + // FIXME(yingwen): We should use actual size of parts. /// Write key-values into the tree. + /// + /// # Panics + /// Panics if the tree is immutable. pub(crate) fn write(&self, kvs: &KeyValues, metrics: &mut WriteMetrics) -> Result<()> { - let mut part = self.mutable.write().unwrap(); - part.write(&self.metadata, &self.row_codec, kvs, metrics) + let mut primary_key = Vec::new(); + let has_pk = !self.metadata.primary_key.is_empty(); + + for kv in kvs.iter() { + ensure!( + kv.num_primary_keys() == self.row_codec.num_fields(), + PrimaryKeyLengthMismatchSnafu { + expect: self.row_codec.num_fields(), + actual: kv.num_primary_keys(), + } + ); + // Safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + metrics.min_ts = metrics.min_ts.min(ts); + metrics.max_ts = metrics.max_ts.max(ts); + metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::(); + + if !has_pk { + // No primary key. + // Now we always assign the first shard and the first pk index to the id. + let pk_id = PkId { + shard_id: 0, + pk_index: 0, + }; + self.write_with_id(pk_id, kv); + continue; + } + + // Encode primary key. + primary_key.clear(); + self.row_codec + .encode_to_vec(kv.primary_keys(), &mut primary_key)?; + + // Add bytes used by the primary key. + metrics.key_bytes += primary_key.len(); + + // Write rows with primary keys. + self.write_with_key(&primary_key, kv)?; + } + + metrics.value_bytes += + kvs.num_rows() * (std::mem::size_of::() + std::mem::size_of::()); + + Ok(()) } /// Scans the tree. pub(crate) fn scan( &self, - projection: Option<&[ColumnId]>, - predicate: Option, + _projection: Option<&[ColumnId]>, + _predicate: Option, ) -> Result { - let mutable = self.mutable.read().unwrap(); - - mutable.scan_part( - &self.metadata, - self.row_codec.clone(), - projection, - predicate.as_ref(), - true, - ) + todo!() } /// Returns true if the tree is empty. pub(crate) fn is_empty(&self) -> bool { - let mutable = self.mutable.write().unwrap(); - mutable.is_empty() + let parts = self.parts.read().unwrap(); + // Gets whether the memtable is empty from the data part. + parts.data_buffer.is_empty() + // TODO(yingwen): Also consider other parts if we freeze the data buffer. + } + + /// Marks the tree as immutable. + /// + /// Once the tree becomes immutable, callers should not write to it again. + pub(crate) fn freeze(&self) -> Result<()> { + let mut parts = self.parts.write().unwrap(); + parts.immutable = true; + // Freezes the index. + if let Some(index) = &parts.index { + index.freeze()?; + } + + Ok(()) + } + + /// Forks an immutable tree. Returns a mutable tree that inherits the index + /// and cache of this tree. + pub(crate) fn fork(&self, metadata: RegionMetadataRef) -> MergeTree { + if metadata.primary_key != self.metadata.primary_key { + // The priamry key is changed. We can't reuse fields. + return MergeTree::new(metadata, &self.config); + } + + let current_parts = self.parts.read().unwrap(); + let index = current_parts + .index + .as_ref() + .map(|index| Arc::new(index.fork())); + // New parts. + let parts = TreeParts { + immutable: false, + index, + data_buffer: DataBuffer::with_capacity(metadata.clone(), DATA_INIT_CAP), + }; + + MergeTree { + config: self.config.clone(), + metadata, + // We can reuse row codec. + row_codec: self.row_codec.clone(), + pk_id_cache: self.pk_id_cache.clone(), + parts: RwLock::new(parts), + } } + + fn write_with_key(&self, primary_key: &[u8], kv: KeyValue) -> Result<()> { + // Safety: `write()` ensures this is not None. + let cache = self.pk_id_cache.as_ref().unwrap(); + if let Some(pk_id) = cache.get(primary_key) { + // The pk is in the cache. + self.write_with_id(pk_id, kv); + return Ok(()); + } + + // The pk is not in the cache, we need to write the pk to the index. + let pk_id = self.write_primary_key(primary_key)?; + // Also writes the pk to the cache. + self.add_pk_to_cache(primary_key, pk_id); + // Writes data. + self.write_with_id(pk_id, kv); + + Ok(()) + } + + fn write_with_id(&self, pk_id: PkId, kv: KeyValue) { + let mut parts = self.parts.write().unwrap(); + assert!(!parts.immutable); + parts.data_buffer.write_row(pk_id, kv) + } + + fn write_primary_key(&self, key: &[u8]) -> Result { + let index = { + let parts = self.parts.read().unwrap(); + assert!(!parts.immutable); + // Safety: The region has primary keys. + parts.index.clone().unwrap() + }; + + index.write_primary_key(key) + } + + fn add_pk_to_cache(&self, primary_key: &[u8], pk_id: PkId) { + let Some(pk_id_cache) = &self.pk_id_cache else { + return; + }; + pk_id_cache.insert(primary_key.to_vec(), pk_id); + CACHE_BYTES + .with_label_values(&[PK_ID_TYPE]) + .add(pk_id_cache_weight(primary_key, &pk_id).into()) + } +} + +struct TreeParts { + /// Whether the tree is immutable. + immutable: bool, + /// Index part of the tree. If the region doesn't have a primary key, this field + /// is `None`. + // TODO(yingwen): Support freezing the index. + index: Option, + /// Data buffer of the tree. + data_buffer: DataBuffer, +} + +/// Maps primary key to [PkId]. +type PkIdCache = Cache, PkId>; + +fn pk_id_cache_weight(k: &[u8], _v: &PkId) -> u32 { + (k.len() + std::mem::size_of::()) as u32 +} + +fn new_cache(cache_size: u64) -> PkIdCache { + PkIdCache::builder() + .max_capacity(cache_size) + .weigher(|k, v| pk_id_cache_weight(k.as_slice(), v)) + .eviction_listener(|k, v, _cause| { + let size = pk_id_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[PK_ID_TYPE]) + .sub(size.into()); + }) + .build() }