From bfcb5d174117b6b59207ebcd945441e6ab806fb7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 21 Feb 2024 22:14:48 +0800 Subject: [PATCH 01/13] feat: write to a shard or a shard builder --- src/mito2/src/memtable/merge_tree/data.rs | 17 ++--- src/mito2/src/memtable/merge_tree/dict.rs | 42 ++++++++---- .../src/memtable/merge_tree/partition.rs | 30 ++++++--- src/mito2/src/memtable/merge_tree/shard.rs | 35 +++++----- .../src/memtable/merge_tree/shard_builder.rs | 65 ++++++++++++++----- src/mito2/src/memtable/merge_tree/tree.rs | 6 +- 6 files changed, 130 insertions(+), 65 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 20224b8af23c..bf3e77cd791f 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -42,10 +42,13 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::{PkId, PkIndex}; +use crate::memtable::merge_tree::PkIndex; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; +/// Initial capacity for the data buffer. +pub(crate) const DATA_INIT_CAP: usize = 8; + /// Data part batches returns by `DataParts::read`. #[derive(Debug, Clone)] pub struct DataBatch<'a> { @@ -123,9 +126,9 @@ impl DataBuffer { } /// Writes a row to data buffer. - pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) { + pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { self.ts_builder.push_value_ref(kv.timestamp()); - self.pk_index_builder.push(Some(pk_id.pk_index)); + self.pk_index_builder.push(Some(pk_index)); self.sequence_builder.push(Some(kv.sequence())); self.op_type_builder.push(Some(kv.op_type() as u8)); @@ -860,13 +863,7 @@ mod tests { ); for kv in kvs.iter() { - buffer.write_row( - PkId { - shard_id: 0, - pk_index, - }, - kv, - ); + buffer.write_row(pk_index, kv); } } diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 43a53bd494f6..04091897ca49 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -61,19 +61,21 @@ impl KeyDictBuilder { self.pk_to_index.get(key).copied() } + /// Returns true if the builder is full. + pub fn is_full(&self) -> bool { + self.num_keys >= self.capacity + } + /// Adds the key to the builder and returns its index if the builder is not full. /// - /// Returns `None` if the builder is full. - pub fn try_insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> Option { + /// # Panics + /// Panics if the builder is full. + pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex { + assert!(!self.is_full()); + if let Some(pk_index) = self.pk_to_index.get(key).copied() { // Already in the builder. - return Some(pk_index); - } - - // A new key. - if self.num_keys >= self.capacity { - // The builder is full. - return None; + return pk_index; } if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() { @@ -91,7 +93,7 @@ impl KeyDictBuilder { metrics.key_bytes += key.len() * 2; self.key_bytes_in_index += key.len(); - Some(pk_index) + pk_index } /// Memory size of the builder. @@ -220,6 +222,20 @@ impl KeyDict { let block_index = position / MAX_KEYS_PER_BLOCK; self.dict_blocks[block_index as usize].key_by_pk_index(position) } + + /// Gets the pk index by the key. + pub fn get_pk_index(&self, key: &[u8]) -> Option { + self.pk_to_index.get(key).copied() + } + + /// Sets the pk weights to sort a data part and replaces pk indices. + pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { + let mut pk_weights = vec![0; self.key_positions.len()]; + for (weight, pk_index) in self.key_positions.iter().enumerate() { + pk_weights[*pk_index as usize] = weight as u16; + } + pk_weights + } } /// Buffer to store unsorted primary keys. @@ -364,7 +380,7 @@ mod tests { let mut last_pk_index = None; let mut metrics = WriteMetrics::default(); for key in &keys { - let pk_index = builder.try_insert_key(key, &mut metrics).unwrap(); + let pk_index = builder.insert_key(key, &mut metrics); last_pk_index = Some(pk_index); } assert_eq!(num_keys - 1, last_pk_index.unwrap()); @@ -397,9 +413,7 @@ mod tests { for i in 0..num_keys { // Each key is 5 bytes. let key = format!("{i:05}"); - builder - .try_insert_key(key.as_bytes(), &mut metrics) - .unwrap(); + builder.insert_key(key.as_bytes(), &mut metrics); } // num_keys * 5 * 2 assert_eq!(5130, metrics.key_bytes); diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 69c92ff69f3a..db8213cc7a43 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -26,6 +26,7 @@ use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; use crate::memtable::merge_tree::shard_builder::ShardBuilder; @@ -56,12 +57,15 @@ impl Partition { // Now we ensure one key only exists in one shard. if let Some(pk_id) = inner.find_key_in_shards(primary_key) { // Key already in shards. - return inner.write_to_shard(pk_id, key_value); + inner.write_to_shard(pk_id, key_value); + return Ok(()); } if inner.shard_builder.should_freeze() { let shard_id = inner.active_shard_id; - let shard = inner.shard_builder.finish(shard_id)?; + let metadata = inner.metadata.clone(); + // Safety: The builder is not empty. + let shard = inner.shard_builder.finish(shard_id, metadata)?.unwrap(); inner.active_shard_id += 1; inner.shards.push(shard); } @@ -69,18 +73,22 @@ impl Partition { // Write to the shard builder. inner .shard_builder - .write_with_key(primary_key, key_value, metrics)?; + .write_with_key(primary_key, key_value, metrics); Ok(()) } /// Writes to the partition without a primary key. - pub fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> { + pub fn write_no_key(&self, key_value: KeyValue) { let mut inner = self.inner.write().unwrap(); // If no primary key, always write to the first shard. if inner.shards.is_empty() { let shard_id = inner.active_shard_id; - inner.shards.push(Shard::new_no_dict(shard_id)); + let data_parts = DataParts { + active: DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), + frozen: Vec::new(), + }; + inner.shards.push(Shard::new(shard_id, None, data_parts)); inner.active_shard_id += 1; } @@ -89,7 +97,7 @@ impl Partition { shard_id: inner.active_shard_id - 1, pk_index: 0, }; - inner.shards[0].write_key_value(pk_id, key_value, metrics) + inner.shards[0].write_key_value(pk_id, key_value); } /// Scans data in the partition. @@ -160,6 +168,7 @@ pub type PartitionRef = Arc; /// /// A key only exists in one shard. struct Inner { + metadata: RegionMetadataRef, /// Shard whose dictionary is active. shard_builder: ShardBuilder, active_shard_id: ShardId, @@ -178,7 +187,12 @@ impl Inner { None } - fn write_to_shard(&mut self, _pk_id: PkId, _key_value: KeyValue) -> Result<()> { - unimplemented!() + fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) { + for shard in &mut self.shards { + if shard.shard_id == pk_id.shard_id { + shard.write_key_value(pk_id, key_value); + return; + } + } } } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 9eceb4920130..39a65a4e7f17 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -19,16 +19,14 @@ use std::collections::HashSet; use common_recordbatch::filter::SimpleFilterEvaluator; use store_api::storage::ColumnId; -use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::data::DataParts; use crate::memtable::merge_tree::dict::KeyDictRef; -use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::{PkId, ShardId}; /// Shard stores data related to the same key dictionary. pub struct Shard { - shard_id: ShardId, + pub(crate) shard_id: ShardId, /// Key dictionary of the shard. `None` if the schema of the tree doesn't have a primary key. key_dict: Option, /// Data in the shard. @@ -36,24 +34,31 @@ pub struct Shard { } impl Shard { - /// Returns a shard without dictionary. - pub fn new_no_dict(_shard_id: ShardId) -> Shard { - unimplemented!() + /// Returns a new shard. + pub fn new(shard_id: ShardId, key_dict: Option, data_parts: DataParts) -> Shard { + Shard { + shard_id, + key_dict, + data_parts, + } } /// Returns the pk id of the key if it exists. - pub fn find_key(&self, _key: &[u8]) -> Option { - unimplemented!() + pub fn find_key(&self, key: &[u8]) -> Option { + let key_dict = self.key_dict.as_ref()?; + let pk_index = key_dict.get_pk_index(key)?; + + Some(PkId { + shard_id: self.shard_id, + pk_index, + }) } /// Writes a key value into the shard. - pub fn write_key_value( - &mut self, - _pk_id: PkId, - _key_value: KeyValue, - _metrics: &mut WriteMetrics, - ) -> Result<()> { - unimplemented!() + pub fn write_key_value(&mut self, pk_id: PkId, key_value: KeyValue) { + debug_assert_eq!(self.shard_id, pk_id.shard_id); + + self.data_parts.active.write_row(pk_id.pk_index, key_value); } /// Scans the shard. diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index c8d78029043c..ade63991accc 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -14,13 +14,17 @@ //! Builder of a shard. +use std::sync::Arc; + +use store_api::metadata::RegionMetadataRef; + use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::DataBuffer; +use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::dict::KeyDictBuilder; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; -use crate::memtable::merge_tree::ShardId; +use crate::memtable::merge_tree::{MergeTreeConfig, ShardId}; /// Builder to write keys and data to a shard that the key dictionary /// is still active. @@ -29,36 +33,67 @@ pub struct ShardBuilder { dict_builder: KeyDictBuilder, /// Buffer to store data. data_buffer: DataBuffer, - /// Max keys in an index shard. - index_max_keys_per_shard: usize, /// Number of rows to freeze a data part. data_freeze_threshold: usize, } impl ShardBuilder { + /// Returns a new builder. + pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> ShardBuilder { + ShardBuilder { + dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard), + data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + data_freeze_threshold: config.data_freeze_threshold, + } + } + /// Write a key value with its encoded primary key. - pub fn write_with_key( - &mut self, - _key: &[u8], - _key_value: KeyValue, - _metrics: &mut WriteMetrics, - ) -> Result<()> { - unimplemented!() + pub fn write_with_key(&mut self, key: &[u8], key_value: KeyValue, metrics: &mut WriteMetrics) { + // Safety: we check whether the builder need to freeze before. + let pk_index = self.dict_builder.insert_key(key, metrics); + self.data_buffer.write_row(pk_index, key_value); } /// Returns true if the builder is empty. pub fn is_empty(&self) -> bool { - unimplemented!() + self.data_buffer.is_empty() } /// Returns true if the builder need to freeze. pub fn should_freeze(&self) -> bool { - unimplemented!() + self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold } /// Builds a new shard and resets the builder. - pub fn finish(&mut self, _shard_id: ShardId) -> Result { - unimplemented!() + pub fn finish( + &mut self, + shard_id: ShardId, + metadata: RegionMetadataRef, + ) -> Result> { + if self.is_empty() { + return Ok(None); + } + + let key_dict = self.dict_builder.finish(); + let data_part = match &key_dict { + Some(dict) => { + let pk_weights = dict.pk_weights_to_sort_data(); + self.data_buffer.freeze(&pk_weights)? + } + None => { + let pk_weights = [0]; + self.data_buffer.freeze(&pk_weights)? + } + }; + + // build data parts. + let data_parts = DataParts { + active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + frozen: vec![data_part], + }; + let key_dict = key_dict.map(Arc::new); + + Ok(Some(Shard::new(shard_id, key_dict, data_parts))) } /// Scans the shard builder diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index d9c26611f362..d2852cb23ed4 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -103,7 +103,7 @@ impl MergeTree { if !has_pk { // No primary key. - self.write_no_key(kv, metrics)?; + self.write_no_key(kv); continue; } @@ -236,11 +236,11 @@ impl MergeTree { partition.write_with_key(primary_key, key_value, metrics) } - fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> { + fn write_no_key(&self, key_value: KeyValue) { let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); let partition = self.get_or_create_partition(partition_key); - partition.write_no_key(key_value, metrics) + partition.write_no_key(key_value) } fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef { From 8646c42977302d47828e3cc94489123fdf7d9d42 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 21 Feb 2024 22:45:32 +0800 Subject: [PATCH 02/13] feat: freeze and fork for partition and shards --- src/mito2/src/memtable/merge_tree/dict.rs | 7 +++ .../src/memtable/merge_tree/partition.rs | 62 ++++++++++++++++--- src/mito2/src/memtable/merge_tree/shard.rs | 23 ++++++- .../src/memtable/merge_tree/shard_builder.rs | 2 +- src/mito2/src/memtable/merge_tree/tree.rs | 2 +- 5 files changed, 86 insertions(+), 10 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 04091897ca49..c2161cb7b30a 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -131,6 +131,7 @@ impl KeyDictBuilder { pk_to_index, dict_blocks: std::mem::take(&mut self.dict_blocks), key_positions, + key_bytes_in_index: self.key_bytes_in_index, }) } @@ -208,6 +209,7 @@ pub struct KeyDict { dict_blocks: Vec, /// Maps pk index to position of the key in [Self::dict_blocks]. key_positions: Vec, + key_bytes_in_index: usize, } pub type KeyDictRef = Arc; @@ -236,6 +238,11 @@ impl KeyDict { } pk_weights } + + /// Returns the shared memory size. + pub(crate) fn shared_memory_size(&self) -> usize { + self.key_bytes_in_index + } } /// Buffer to store unsorted primary keys. diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index db8213cc7a43..82b1bb8133b6 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -42,8 +42,18 @@ pub struct Partition { impl Partition { /// Creates a new partition. - pub fn new(_metadata: RegionMetadataRef, _config: &MergeTreeConfig) -> Self { - unimplemented!() + pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { + let shard_builder = ShardBuilder::new(metadata.clone(), config); + + Partition { + inner: RwLock::new(Inner { + metadata, + shard_builder, + active_shard_id: 0, + shards: Vec::new(), + num_rows: 0, + }), + } } /// Writes to the partition with a primary key. @@ -74,6 +84,7 @@ impl Partition { inner .shard_builder .write_with_key(primary_key, key_value, metrics); + inner.num_rows += 1; Ok(()) } @@ -83,6 +94,7 @@ impl Partition { let mut inner = self.inner.write().unwrap(); // If no primary key, always write to the first shard. if inner.shards.is_empty() { + // Create the shard if it doesn't exist. let shard_id = inner.active_shard_id; let data_parts = DataParts { active: DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), @@ -98,6 +110,7 @@ impl Partition { pk_index: 0, }; inner.shards[0].write_key_value(pk_id, key_value); + inner.num_rows += 1; } /// Scans data in the partition. @@ -111,22 +124,55 @@ impl Partition { /// Freezes the partition. pub fn freeze(&self) -> Result<()> { - unimplemented!() + let mut inner = self.inner.write().unwrap(); + // TODO(yingwen): wrap a method to bump shard id and create shard? + let shard_id = inner.active_shard_id; + let metadata = inner.metadata.clone(); + let Some(shard) = inner.shard_builder.finish(shard_id, metadata)? else { + return Ok(()); + }; + inner.active_shard_id += 1; + + inner.shards.push(shard); + Ok(()) } /// Forks the partition. - pub fn fork(&self, _metadata: &RegionMetadataRef) -> Partition { - unimplemented!() + pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition { + let inner = self.inner.read().unwrap(); + // TODO(yingwen): TTL or evict shards. + let shard_builder = ShardBuilder::new(metadata.clone(), config); + let shards = inner + .shards + .iter() + .map(|shard| shard.fork(metadata.clone())) + .collect(); + + Partition { + inner: RwLock::new(Inner { + metadata: metadata.clone(), + shard_builder, + active_shard_id: inner.active_shard_id, + shards, + num_rows: 0, + }), + } } /// Returns true if the partition has data. pub fn has_data(&self) -> bool { - unimplemented!() + let inner = self.inner.read().unwrap(); + inner.num_rows > 0 } /// Returns shared memory size of the partition. pub fn shared_memory_size(&self) -> usize { - unimplemented!() + let inner = self.inner.read().unwrap(); + inner + .shards + .iter() + .map(|shard| shard.shared_memory_size()) + .sum() } /// Get partition key from the key value. @@ -174,6 +220,7 @@ struct Inner { active_shard_id: ShardId, /// Shards with frozon dictionary. shards: Vec, + num_rows: usize, } impl Inner { @@ -191,6 +238,7 @@ impl Inner { for shard in &mut self.shards { if shard.shard_id == pk_id.shard_id { shard.write_key_value(pk_id, key_value); + self.num_rows += 1; return; } } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 39a65a4e7f17..689f5c908c6f 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -17,10 +17,11 @@ use std::collections::HashSet; use common_recordbatch::filter::SimpleFilterEvaluator; +use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::DataParts; +use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::dict::KeyDictRef; use crate::memtable::merge_tree::{PkId, ShardId}; @@ -69,6 +70,26 @@ impl Shard { ) -> ShardReader { unimplemented!() } + + /// Returns the memory size of the shard part. + pub fn shared_memory_size(&self) -> usize { + self.key_dict + .as_ref() + .map(|dict| dict.shared_memory_size()) + .unwrap_or(0) + } + + /// Forks a shard. + pub fn fork(&self, metadata: RegionMetadataRef) -> Shard { + Shard { + shard_id: self.shard_id, + key_dict: self.key_dict.clone(), + data_parts: DataParts { + active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + frozen: Vec::new(), + }, + } + } } /// Reader to read rows in a shard. diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index ade63991accc..a1d5bcb23702 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -96,7 +96,7 @@ impl ShardBuilder { Ok(Some(Shard::new(shard_id, key_dict, data_parts))) } - /// Scans the shard builder + /// Scans the shard builder. pub fn scan(&mut self, _shard_id: ShardId) -> Result { unimplemented!() } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index d2852cb23ed4..4ae7d197b2e7 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -202,7 +202,7 @@ impl MergeTree { } // Only fork partitions that have data. - let forked_part = part.fork(&metadata); + let forked_part = part.fork(&metadata, &self.config); forked.insert(*part_key, Arc::new(forked_part)); } From 35a941e0854688e8131693e1a11157322f9f5999 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 11:07:31 +0800 Subject: [PATCH 03/13] chore: shard builder --- src/mito2/src/memtable/merge_tree/shard.rs | 7 ++----- src/mito2/src/memtable/merge_tree/shard_builder.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 689f5c908c6f..89fba01bbee9 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -63,11 +63,8 @@ impl Shard { } /// Scans the shard. - pub fn scan( - &self, - _projection: &HashSet, - _filters: &[SimpleFilterEvaluator], - ) -> ShardReader { + // TODO(yingwen): Push down projection to data parts. + pub fn scan(&self) -> ShardReader { unimplemented!() } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index a1d5bcb23702..b94a0085f2f2 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -14,9 +14,12 @@ //! Builder of a shard. +use std::collections::HashSet; use std::sync::Arc; +use common_recordbatch::filter::SimpleFilterEvaluator; use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; @@ -97,10 +100,16 @@ impl ShardBuilder { } /// Scans the shard builder. - pub fn scan(&mut self, _shard_id: ShardId) -> Result { + pub fn scan( + &mut self, + _projection: &HashSet, + _filters: &[SimpleFilterEvaluator], + ) -> Result { unimplemented!() } } /// Reader to scan a shard. builder. pub struct ShardBuilderReader {} + +// TODO(yingwen): Can we use generic for data reader? From 27335726bc2c0631fdf379027fb462bf08f404a2 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 11:07:46 +0800 Subject: [PATCH 04/13] chore: change dict reader to support random access --- src/mito2/src/memtable/merge_tree/dict.rs | 39 ++++++++++------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index c2161cb7b30a..b17dd7798910 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -135,8 +135,8 @@ impl KeyDictBuilder { }) } - /// Scans the builder. - pub fn scan(&self) -> DictBuilderReader { + /// Reads the builder. + pub fn read(&self) -> DictBuilderReader { let sorted_pk_indices = self.pk_to_index.values().copied().collect(); let block = self.key_buffer.finish_cloned(); let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1); @@ -165,27 +165,20 @@ impl DictBuilderReader { } } - /// Returns true if the item in the reader is valid. - pub fn is_valid(&self) -> bool { - self.offset < self.sorted_pk_indices.len() + /// Returns the number of keys. + pub fn num_keys(&self) -> usize { + self.sorted_pk_indices.len() } - /// Returns current key. - pub fn current_key(&self) -> &[u8] { - let pk_index = self.current_pk_index(); - self.key_by_pk_index(pk_index) - } - - /// Returns current [PkIndex] of the key. - pub fn current_pk_index(&self) -> PkIndex { - assert!(self.is_valid()); - self.sorted_pk_indices[self.offset] + /// Gets the i-th pk index. + pub fn pk_index(&self, offset: usize) -> PkIndex { + self.sorted_pk_indices[offset] } - /// Advances the reader. - pub fn next(&mut self) { - assert!(self.is_valid()); - self.offset += 1; + /// Gets the i-th key. + pub fn key(&self, offset: usize) -> &[u8] { + let pk_index = self.pk_index(offset); + self.key_by_pk_index(pk_index) } /// Returns pk indices sorted by keys. @@ -387,6 +380,7 @@ mod tests { let mut last_pk_index = None; let mut metrics = WriteMetrics::default(); for key in &keys { + assert!(!builder.is_full()); let pk_index = builder.insert_key(key, &mut metrics); last_pk_index = Some(pk_index); } @@ -402,10 +396,9 @@ mod tests { expect.sort_unstable_by(|a, b| a.0.cmp(&b.0)); let mut result = Vec::with_capacity(expect.len()); - let mut reader = builder.scan(); - while reader.is_valid() { - result.push((reader.current_key().to_vec(), reader.current_pk_index())); - reader.next(); + let reader = builder.read(); + for i in 0..reader.num_keys() { + result.push((reader.key(i).to_vec(), reader.pk_index(i))); } assert_eq!(expect, result); } From d664738aecb2bddb6c4d78e11a899d93c0edfc5b Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 12:15:09 +0800 Subject: [PATCH 05/13] test: test write shard --- src/mito2/src/memtable/merge_tree/data.rs | 1 + .../src/memtable/merge_tree/partition.rs | 2 +- src/mito2/src/memtable/merge_tree/shard.rs | 106 +++++++++++++++++- src/mito2/src/test_util/memtable_util.rs | 40 +++++++ 4 files changed, 143 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index bf3e77cd791f..b8a01bce7c64 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -531,6 +531,7 @@ impl<'a> DataPartEncoder<'a> { } } +// TODO(yingwen): rm pub(crate) and add a new method. /// Data parts under a shard. pub struct DataParts { /// The active writing buffer. diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 82b1bb8133b6..854da5c45f05 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -226,7 +226,7 @@ struct Inner { impl Inner { fn find_key_in_shards(&self, primary_key: &[u8]) -> Option { for shard in &self.shards { - if let Some(pkid) = shard.find_key(primary_key) { + if let Some(pkid) = shard.find_id_by_key(primary_key) { return Some(pkid); } } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 89fba01bbee9..ff47e63b7b27 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -14,11 +14,7 @@ //! Shard in a partition. -use std::collections::HashSet; - -use common_recordbatch::filter::SimpleFilterEvaluator; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; @@ -45,7 +41,7 @@ impl Shard { } /// Returns the pk id of the key if it exists. - pub fn find_key(&self, key: &[u8]) -> Option { + pub fn find_id_by_key(&self, key: &[u8]) -> Option { let key_dict = self.key_dict.as_ref()?; let pk_index = key_dict.get_pk_index(key)?; @@ -91,3 +87,103 @@ impl Shard { /// Reader to read rows in a shard. pub struct ShardReader {} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::memtable::merge_tree::dict::KeyDictBuilder; + use crate::memtable::merge_tree::metrics::WriteMetrics; + use crate::memtable::merge_tree::PkIndex; + use crate::memtable::KeyValues; + use crate::test_util::memtable_util::{ + build_key_values_with_ts_seq_values, encode_key, encode_key_by_kv, encode_keys, + metadata_for_test, + }; + + fn input_with_key(metadata: &RegionMetadataRef) -> Vec { + vec![ + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 3, + [30, 31].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 0, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 1, + [10, 11].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 1, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 2, + [20, 21].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 2, + ), + ] + } + + fn new_shard_with_dict( + shard_id: ShardId, + metadata: RegionMetadataRef, + input: &[KeyValues], + ) -> Shard { + let mut dict_builder = KeyDictBuilder::new(1024); + let mut metrics = WriteMetrics::default(); + let mut keys = Vec::with_capacity(input.len()); + for kvs in input { + encode_keys(&metadata, kvs, &mut keys); + } + for key in &keys { + dict_builder.insert_key(&key, &mut metrics); + } + + let dict = dict_builder.finish().unwrap(); + let data_parts = DataParts { + active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + frozen: vec![], + }; + + Shard::new(shard_id, Some(Arc::new(dict)), data_parts) + } + + #[test] + fn test_shard_find_by_key() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let shard = new_shard_with_dict(8, metadata, &input); + for i in 0..input.len() { + let key = encode_key("shard", i as i64); + assert_eq!( + PkId { + shard_id: 8, + pk_index: i as PkIndex, + }, + shard.find_id_by_key(&key).unwrap() + ); + } + assert!(shard.find_id_by_key(&encode_key("shard", 100)).is_none()); + } + + #[test] + fn test_write_shard() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let mut shard = new_shard_with_dict(8, metadata, &input); + for key_values in &input { + for kv in key_values.iter() { + let key = encode_key_by_kv(&kv); + let pk_id = shard.find_id_by_key(&key).unwrap(); + shard.write_key_value(pk_id, kv); + } + } + } +} diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 7e761cad771a..6075da97f8a8 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -22,15 +22,18 @@ use api::v1::value::ValueData; use api::v1::{Row, Rows, SemanticType}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; +use datatypes::value::ValueRef; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; +use crate::memtable::key_values::KeyValue; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, }; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Empty memtable for test. #[derive(Debug, Default)] @@ -198,3 +201,40 @@ pub(crate) fn build_key_values_with_ts_seq_values( }; KeyValues::new(schema.as_ref(), mutation).unwrap() } + +/// Encode keys. +pub(crate) fn encode_keys( + metadata: &RegionMetadataRef, + key_values: &KeyValues, + keys: &mut Vec>, +) { + let row_codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + for kv in key_values.iter() { + let key = row_codec.encode(kv.primary_keys()).unwrap(); + keys.push(key); + } +} + +/// Encode one key. +pub(crate) fn encode_key(k0: &str, k1: i64) -> Vec { + let row_codec = McmpRowCodec::new(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::int64_datatype()), + ]); + let key = [ValueRef::String(k0), ValueRef::Int64(k1)]; + row_codec.encode(key.into_iter()).unwrap() +} + +/// Encode one key. +pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { + let row_codec = McmpRowCodec::new(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::int64_datatype()), + ]); + row_codec.encode(key_value.primary_keys()).unwrap() +} From 72e40e2b79cfde35f7c16b5fcc94049360b6d6ab Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 14:02:13 +0800 Subject: [PATCH 06/13] test: test write --- src/mito2/src/memtable/merge_tree/shard.rs | 12 +-- .../src/memtable/merge_tree/shard_builder.rs | 85 ++++++++++++++++++- 2 files changed, 90 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index ff47e63b7b27..9221d2e9df43 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -107,24 +107,24 @@ mod tests { build_key_values_with_ts_seq_values( metadata, "shard".to_string(), - 3, - [30, 31].into_iter(), + 2, + [20, 21].into_iter(), [Some(0.0), Some(1.0)].into_iter(), 0, ), build_key_values_with_ts_seq_values( metadata, "shard".to_string(), - 1, - [10, 11].into_iter(), + 0, + [0, 1].into_iter(), [Some(0.0), Some(1.0)].into_iter(), 1, ), build_key_values_with_ts_seq_values( metadata, "shard".to_string(), - 2, - [20, 21].into_iter(), + 1, + [10, 11].into_iter(), [Some(0.0), Some(1.0)].into_iter(), 2, ), diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index b94a0085f2f2..7910513c2ec5 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -109,7 +109,90 @@ impl ShardBuilder { } } -/// Reader to scan a shard. builder. +/// Reader to scan a shard builder. pub struct ShardBuilderReader {} // TODO(yingwen): Can we use generic for data reader? + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::memtable::merge_tree::dict::KeyDictBuilder; + use crate::memtable::merge_tree::metrics::WriteMetrics; + use crate::memtable::KeyValues; + use crate::test_util::memtable_util::{ + build_key_values_with_ts_seq_values, encode_key_by_kv, encode_keys, metadata_for_test, + }; + + fn input_with_key(metadata: &RegionMetadataRef) -> Vec { + vec![ + build_key_values_with_ts_seq_values( + metadata, + "shard_builder".to_string(), + 3, + [30, 31].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 0, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard_builder".to_string(), + 1, + [10, 11].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 1, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard_builder".to_string(), + 2, + [20, 21].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 2, + ), + ] + } + + fn new_shard_builder( + shard_id: ShardId, + metadata: RegionMetadataRef, + input: &[KeyValues], + ) -> Shard { + let mut dict_builder = KeyDictBuilder::new(1024); + let mut metrics = WriteMetrics::default(); + let mut keys = Vec::with_capacity(input.len()); + for kvs in input { + encode_keys(&metadata, kvs, &mut keys); + } + for key in &keys { + dict_builder.insert_key(&key, &mut metrics); + } + + let dict = dict_builder.finish().unwrap(); + let data_parts = DataParts { + active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + frozen: vec![], + }; + + Shard::new(shard_id, Some(Arc::new(dict)), data_parts) + } + + #[test] + fn test_write_shard_builder() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let config = MergeTreeConfig::default(); + let mut shard_builder = ShardBuilder::new(metadata.clone(), &config); + let mut metrics = WriteMetrics::default(); + + assert!(shard_builder.is_empty()); + for key_values in &input { + for kv in key_values.iter() { + let key = encode_key_by_kv(&kv); + shard_builder.write_with_key(&key, kv, &mut metrics); + } + } + } +} From ec78bf70a652baba2b5097311f5db93ff8dfef16 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 14:36:24 +0800 Subject: [PATCH 07/13] test: test memtable --- src/mito2/src/memtable/merge_tree.rs | 82 ++++++++++++++++++++++ src/mito2/src/memtable/merge_tree/shard.rs | 2 +- src/mito2/src/test_util/memtable_util.rs | 49 ++++++++++--- 3 files changed, 122 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 8a0a6031a0bf..cbaa92cea403 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -260,3 +260,85 @@ impl MemtableBuilder for MergeTreeMemtableBuilder { )) } } + +#[cfg(test)] +mod tests { + use common_time::Timestamp; + + use super::*; + use crate::test_util::memtable_util; + + #[test] + fn test_memtable_sorted_input() { + write_sorted_input(true); + write_sorted_input(false); + } + + fn write_sorted_input(has_pk: bool) { + let metadata = if has_pk { + memtable_util::metadata_with_primary_key(vec![1, 0], true) + } else { + memtable_util::metadata_with_primary_key(vec![], false) + }; + let timestamps = (0..100).collect::>(); + let kvs = + memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); + let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default()); + memtable.write(&kvs).unwrap(); + + // TODO(yingwen): Test iter. + + let stats = memtable.stats(); + assert!(stats.bytes_allocated() > 0); + assert_eq!( + Some(( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(99) + )), + stats.time_range() + ); + } + + #[test] + fn test_memtable_unsorted_input() { + write_iter_unsorted_input(true); + write_iter_unsorted_input(false); + } + + fn write_iter_unsorted_input(has_pk: bool) { + let metadata = if has_pk { + memtable_util::metadata_with_primary_key(vec![1, 0], true) + } else { + memtable_util::metadata_with_primary_key(vec![], false) + }; + let memtable = + MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[1, 3, 7, 5, 6], + 0, // sequence 0, 1, 2, 3, 4 + ); + memtable.write(&kvs).unwrap(); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[5, 2, 4, 0, 7], + 5, // sequence 5, 6, 7, 8, 9 + ); + memtable.write(&kvs).unwrap(); + + // TODO(yingwen): Test iter. + + let stats = memtable.stats(); + assert!(stats.bytes_allocated() > 0); + assert_eq!( + Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))), + stats.time_range() + ); + } +} diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 9221d2e9df43..4fc041751dc9 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -161,7 +161,7 @@ mod tests { let input = input_with_key(&metadata); let shard = new_shard_with_dict(8, metadata, &input); for i in 0..input.len() { - let key = encode_key("shard", i as i64); + let key = encode_key("shard", i as u32); assert_eq!( PkId { shard_id: 8, diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 6075da97f8a8..584f21350719 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -96,14 +96,19 @@ impl MemtableBuilder for EmptyMemtableBuilder { /// /// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`. pub(crate) fn metadata_for_test() -> RegionMetadataRef { - metadata_with_primary_key(vec![0, 1]) + metadata_with_primary_key(vec![0, 1], false) } /// Creates a region metadata to test memtable and specific primary key. /// -/// The schema is `k0, k1, ts, v0, v1`. -pub(crate) fn metadata_with_primary_key(primary_key: Vec) -> RegionMetadataRef { +/// If `enable_table_id` is false, the schema is `k0, k1, ts, v0, v1`. +/// If `enable_table_id` is true, the schema is `k0, __table_id, ts, v0, v1`. +pub(crate) fn metadata_with_primary_key( + primary_key: Vec, + enable_table_id: bool, +) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + let maybe_table_id = if enable_table_id { "table_id" } else { "k1" }; builder .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), @@ -111,7 +116,11 @@ pub(crate) fn metadata_with_primary_key(primary_key: Vec) -> RegionMet column_id: 0, }) .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false), + column_schema: ColumnSchema::new( + maybe_table_id, + ConcreteDataType::uint32_datatype(), + false, + ), semantic_type: semantic_type_of_column(1, &primary_key), column_id: 1, }) @@ -147,11 +156,31 @@ fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> Sem } } +/// Builds key values with `len` rows for test. +pub(crate) fn build_key_values( + schema: &RegionMetadataRef, + k0: String, + k1: u32, + timestamps: &[i64], + sequence: SequenceNumber, +) -> KeyValues { + let values = timestamps.iter().map(|v| Some(*v as f64)); + + build_key_values_with_ts_seq_values( + schema, + k0, + k1, + timestamps.iter().copied(), + values, + sequence, + ) +} + /// Builds key values with timestamps (ms) and sequences for test. pub(crate) fn build_key_values_with_ts_seq_values( schema: &RegionMetadataRef, k0: String, - k1: i64, + k1: u32, timestamps: impl Iterator, values: impl Iterator>, sequence: SequenceNumber, @@ -177,7 +206,7 @@ pub(crate) fn build_key_values_with_ts_seq_values( value_data: Some(ValueData::StringValue(k0.clone())), }, api::v1::Value { - value_data: Some(ValueData::I64Value(k1)), + value_data: Some(ValueData::U32Value(k1)), }, api::v1::Value { value_data: Some(ValueData::TimestampMillisecondValue(ts)), @@ -221,12 +250,12 @@ pub(crate) fn encode_keys( } /// Encode one key. -pub(crate) fn encode_key(k0: &str, k1: i64) -> Vec { +pub(crate) fn encode_key(k0: &str, k1: u32) -> Vec { let row_codec = McmpRowCodec::new(vec![ SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::int64_datatype()), + SortField::new(ConcreteDataType::uint32_datatype()), ]); - let key = [ValueRef::String(k0), ValueRef::Int64(k1)]; + let key = [ValueRef::String(k0), ValueRef::UInt32(k1)]; row_codec.encode(key.into_iter()).unwrap() } @@ -234,7 +263,7 @@ pub(crate) fn encode_key(k0: &str, k1: i64) -> Vec { pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { let row_codec = McmpRowCodec::new(vec![ SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::int64_datatype()), + SortField::new(ConcreteDataType::uint32_datatype()), ]); row_codec.encode(key_value.primary_keys()).unwrap() } From 5723516bfe4f76c693b6b9d481753e5b8280b683 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 14:46:31 +0800 Subject: [PATCH 08/13] feat: add new and write_row to DataParts --- src/mito2/src/memtable/merge_tree/data.rs | 17 ++++++++++++++--- src/mito2/src/memtable/merge_tree/partition.rs | 10 +++++----- src/mito2/src/memtable/merge_tree/shard.rs | 15 ++++++--------- .../src/memtable/merge_tree/shard_builder.rs | 13 +++++-------- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index b8a01bce7c64..09c58125eb93 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -531,13 +531,24 @@ impl<'a> DataPartEncoder<'a> { } } -// TODO(yingwen): rm pub(crate) and add a new method. /// Data parts under a shard. pub struct DataParts { /// The active writing buffer. - pub(crate) active: DataBuffer, + active: DataBuffer, /// immutable (encoded) parts. - pub(crate) frozen: Vec, + frozen: Vec, +} + +impl DataParts { + /// Creates a new [DataParts]. + pub fn new(active: DataBuffer, frozen: Vec) -> DataParts { + DataParts { active, frozen } + } + + /// Writes a row into parts. + pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { + self.active.write_row(pk_index, kv) + } } /// Format of immutable data part. diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 854da5c45f05..6af3e3196dc8 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -96,10 +96,10 @@ impl Partition { if inner.shards.is_empty() { // Create the shard if it doesn't exist. let shard_id = inner.active_shard_id; - let data_parts = DataParts { - active: DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), - frozen: Vec::new(), - }; + let data_parts = DataParts::new( + DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), + Vec::new(), + ); inner.shards.push(Shard::new(shard_id, None, data_parts)); inner.active_shard_id += 1; } @@ -218,7 +218,7 @@ struct Inner { /// Shard whose dictionary is active. shard_builder: ShardBuilder, active_shard_id: ShardId, - /// Shards with frozon dictionary. + /// Shards with frozen dictionary. shards: Vec, num_rows: usize, } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 4fc041751dc9..f1aa9b3c4694 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -55,7 +55,7 @@ impl Shard { pub fn write_key_value(&mut self, pk_id: PkId, key_value: KeyValue) { debug_assert_eq!(self.shard_id, pk_id.shard_id); - self.data_parts.active.write_row(pk_id.pk_index, key_value); + self.data_parts.write_row(pk_id.pk_index, key_value); } /// Scans the shard. @@ -77,10 +77,10 @@ impl Shard { Shard { shard_id: self.shard_id, key_dict: self.key_dict.clone(), - data_parts: DataParts { - active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), - frozen: Vec::new(), - }, + data_parts: DataParts::new( + DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + Vec::new(), + ), } } } @@ -147,10 +147,7 @@ mod tests { } let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts { - active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), - frozen: vec![], - }; + let data_parts = DataParts::new(DataBuffer::with_capacity(metadata, DATA_INIT_CAP), vec![]); Shard::new(shard_id, Some(Arc::new(dict)), data_parts) } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 7910513c2ec5..f605e656f9f9 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -90,10 +90,10 @@ impl ShardBuilder { }; // build data parts. - let data_parts = DataParts { - active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), - frozen: vec![data_part], - }; + let data_parts = DataParts::new( + DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + vec![data_part], + ); let key_dict = key_dict.map(Arc::new); Ok(Some(Shard::new(shard_id, key_dict, data_parts))) @@ -171,10 +171,7 @@ mod tests { } let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts { - active: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), - frozen: vec![], - }; + let data_parts = DataParts::new(DataBuffer::with_capacity(metadata, DATA_INIT_CAP), vec![]); Shard::new(shard_id, Some(Arc::new(dict)), data_parts) } From cef97b347304cc1f00b71a80743222ea895e95e3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 15:17:18 +0800 Subject: [PATCH 09/13] refactor: partition freeze shards --- .../src/memtable/merge_tree/partition.rs | 71 ++++++++++--------- .../src/memtable/merge_tree/shard_builder.rs | 12 ++-- 2 files changed, 43 insertions(+), 40 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 6af3e3196dc8..a265c65a07a4 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -46,13 +46,7 @@ impl Partition { let shard_builder = ShardBuilder::new(metadata.clone(), config); Partition { - inner: RwLock::new(Inner { - metadata, - shard_builder, - active_shard_id: 0, - shards: Vec::new(), - num_rows: 0, - }), + inner: RwLock::new(Inner::new(metadata, shard_builder)), } } @@ -72,12 +66,7 @@ impl Partition { } if inner.shard_builder.should_freeze() { - let shard_id = inner.active_shard_id; - let metadata = inner.metadata.clone(); - // Safety: The builder is not empty. - let shard = inner.shard_builder.finish(shard_id, metadata)?.unwrap(); - inner.active_shard_id += 1; - inner.shards.push(shard); + inner.freeze_active_shard()?; } // Write to the shard builder. @@ -93,20 +82,12 @@ impl Partition { pub fn write_no_key(&self, key_value: KeyValue) { let mut inner = self.inner.write().unwrap(); // If no primary key, always write to the first shard. - if inner.shards.is_empty() { - // Create the shard if it doesn't exist. - let shard_id = inner.active_shard_id; - let data_parts = DataParts::new( - DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), - Vec::new(), - ); - inner.shards.push(Shard::new(shard_id, None, data_parts)); - inner.active_shard_id += 1; - } + debug_assert!(!inner.shards.is_empty()); + debug_assert_eq!(1, inner.active_shard_id); // A dummy pk id. let pk_id = PkId { - shard_id: inner.active_shard_id - 1, + shard_id: 0, pk_index: 0, }; inner.shards[0].write_key_value(pk_id, key_value); @@ -125,15 +106,7 @@ impl Partition { /// Freezes the partition. pub fn freeze(&self) -> Result<()> { let mut inner = self.inner.write().unwrap(); - // TODO(yingwen): wrap a method to bump shard id and create shard? - let shard_id = inner.active_shard_id; - let metadata = inner.metadata.clone(); - let Some(shard) = inner.shard_builder.finish(shard_id, metadata)? else { - return Ok(()); - }; - inner.active_shard_id += 1; - - inner.shards.push(shard); + inner.freeze_active_shard()?; Ok(()) } @@ -224,6 +197,27 @@ struct Inner { } impl Inner { + fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder) -> Self { + let mut inner = Self { + metadata, + shard_builder, + active_shard_id: 0, + shards: Vec::new(), + num_rows: 0, + }; + + if inner.metadata.primary_key.is_empty() { + let data_parts = DataParts::new( + DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), + Vec::new(), + ); + inner.shards.push(Shard::new(0, None, data_parts)); + inner.active_shard_id = 1; + } + + inner + } + fn find_key_in_shards(&self, primary_key: &[u8]) -> Option { for shard in &self.shards { if let Some(pkid) = shard.find_id_by_key(primary_key) { @@ -243,4 +237,15 @@ impl Inner { } } } + + fn freeze_active_shard(&mut self) -> Result<()> { + if let Some(shard) = self + .shard_builder + .finish(self.active_shard_id, self.metadata.clone())? + { + self.active_shard_id += 1; + self.shards.push(shard); + } + Ok(()) + } } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index f605e656f9f9..78e26bc1fb03 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -57,23 +57,20 @@ impl ShardBuilder { self.data_buffer.write_row(pk_index, key_value); } - /// Returns true if the builder is empty. - pub fn is_empty(&self) -> bool { - self.data_buffer.is_empty() - } - /// Returns true if the builder need to freeze. pub fn should_freeze(&self) -> bool { self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold } /// Builds a new shard and resets the builder. + /// + /// Returns `None` if the builder is empty. pub fn finish( &mut self, shard_id: ShardId, metadata: RegionMetadataRef, ) -> Result> { - if self.is_empty() { + if self.data_buffer.is_empty() { return Ok(None); } @@ -183,13 +180,14 @@ mod tests { let config = MergeTreeConfig::default(); let mut shard_builder = ShardBuilder::new(metadata.clone(), &config); let mut metrics = WriteMetrics::default(); + assert!(shard_builder.finish(1, metadata.clone()).unwrap().is_none()); - assert!(shard_builder.is_empty()); for key_values in &input { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); shard_builder.write_with_key(&key, kv, &mut metrics); } } + shard_builder.finish(1, metadata).unwrap().unwrap(); } } From 6f9b941376264868a0b637b5270c40775768986c Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 15:25:12 +0800 Subject: [PATCH 10/13] refactor: write_with_pk_id --- src/mito2/src/memtable/merge_tree/partition.rs | 4 ++-- src/mito2/src/memtable/merge_tree/shard.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index a265c65a07a4..3bb41b3e0cf9 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -90,7 +90,7 @@ impl Partition { shard_id: 0, pk_index: 0, }; - inner.shards[0].write_key_value(pk_id, key_value); + inner.shards[0].write_with_pk_id(pk_id, key_value); inner.num_rows += 1; } @@ -231,7 +231,7 @@ impl Inner { fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) { for shard in &mut self.shards { if shard.shard_id == pk_id.shard_id { - shard.write_key_value(pk_id, key_value); + shard.write_with_pk_id(pk_id, key_value); self.num_rows += 1; return; } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index f1aa9b3c4694..f65d95dbb426 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -52,7 +52,7 @@ impl Shard { } /// Writes a key value into the shard. - pub fn write_key_value(&mut self, pk_id: PkId, key_value: KeyValue) { + pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: KeyValue) { debug_assert_eq!(self.shard_id, pk_id.shard_id); self.data_parts.write_row(pk_id.pk_index, key_value); @@ -179,7 +179,7 @@ mod tests { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); let pk_id = shard.find_id_by_key(&key).unwrap(); - shard.write_key_value(pk_id, kv); + shard.write_with_pk_id(pk_id, kv); } } } From d7f6a5f81099681b9454483f45ee6a053a472ad0 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 15:31:08 +0800 Subject: [PATCH 11/13] style: fix clippy --- src/mito2/src/memtable/merge_tree/shard.rs | 2 +- src/mito2/src/memtable/merge_tree/shard_builder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index f65d95dbb426..7299e160d508 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -143,7 +143,7 @@ mod tests { encode_keys(&metadata, kvs, &mut keys); } for key in &keys { - dict_builder.insert_key(&key, &mut metrics); + dict_builder.insert_key(key, &mut metrics); } let dict = dict_builder.finish().unwrap(); diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 78e26bc1fb03..9bf2c27ab5b3 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -164,7 +164,7 @@ mod tests { encode_keys(&metadata, kvs, &mut keys); } for key in &keys { - dict_builder.insert_key(&key, &mut metrics); + dict_builder.insert_key(key, &mut metrics); } let dict = dict_builder.finish().unwrap(); From 48801c60d365b31c1119a4edf8efaca03f78d170 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 22 Feb 2024 21:32:01 +0800 Subject: [PATCH 12/13] chore: add methods to get pk weights --- src/mito2/src/memtable/merge_tree/dict.rs | 29 ++++++++++++++++------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index b17dd7798910..5c1c3c3a57f6 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -181,15 +181,30 @@ impl DictBuilderReader { self.key_by_pk_index(pk_index) } + /// Gets the key by the pk index. + pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] { + let block_idx = pk_index / MAX_KEYS_PER_BLOCK; + self.blocks[block_idx as usize].key_by_pk_index(pk_index) + } + + /// Returns pk weights to sort a data part and replaces pk indices. + pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { + compute_pk_weights(&self.sorted_pk_indices) + } + /// Returns pk indices sorted by keys. pub(crate) fn sorted_pk_index(&self) -> &[PkIndex] { &self.sorted_pk_indices } +} - fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] { - let block_idx = pk_index / MAX_KEYS_PER_BLOCK; - self.blocks[block_idx as usize].key_by_pk_index(pk_index) +/// Returns pk weights to sort a data part and replaces pk indices. +fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec { + let mut pk_weights = vec![0; sorted_pk_indices.len()]; + for (weight, pk_index) in sorted_pk_indices.iter().enumerate() { + pk_weights[*pk_index as usize] = weight as u16; } + pk_weights } /// A key dictionary. @@ -223,13 +238,9 @@ impl KeyDict { self.pk_to_index.get(key).copied() } - /// Sets the pk weights to sort a data part and replaces pk indices. + /// Returns pk weights to sort a data part and replaces pk indices. pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { - let mut pk_weights = vec![0; self.key_positions.len()]; - for (weight, pk_index) in self.key_positions.iter().enumerate() { - pk_weights[*pk_index as usize] = weight as u16; - } - pk_weights + compute_pk_weights(&self.key_positions) } /// Returns the shared memory size. From ffa5354003b5b892e8e315d7e774ddb201464188 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 23 Feb 2024 15:08:02 +0800 Subject: [PATCH 13/13] chroe: fix compiler errors --- src/mito2/src/memtable/merge_tree/data.rs | 5 +++++ src/mito2/src/memtable/merge_tree/partition.rs | 7 ++----- src/mito2/src/memtable/merge_tree/shard.rs | 9 +++------ src/mito2/src/memtable/merge_tree/shard_builder.rs | 7 ++----- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index cea4d9baccad..f414ca38f99a 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -678,6 +678,11 @@ impl DataParts { } } + pub(crate) fn with_frozen(mut self, frozen: Vec) -> Self { + self.frozen = frozen; + self + } + /// Writes a row into parts. pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { self.active.write_row(pk_index, kv) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 3bb41b3e0cf9..dc817d134ded 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -26,7 +26,7 @@ use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; +use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; use crate::memtable::merge_tree::shard_builder::ShardBuilder; @@ -207,10 +207,7 @@ impl Inner { }; if inner.metadata.primary_key.is_empty() { - let data_parts = DataParts::new( - DataBuffer::with_capacity(inner.metadata.clone(), DATA_INIT_CAP), - Vec::new(), - ); + let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP); inner.shards.push(Shard::new(0, None, data_parts)); inner.active_shard_id = 1; } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 7299e160d508..86c5ea18f1a2 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -17,7 +17,7 @@ use store_api::metadata::RegionMetadataRef; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; +use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::dict::KeyDictRef; use crate::memtable::merge_tree::{PkId, ShardId}; @@ -77,10 +77,7 @@ impl Shard { Shard { shard_id: self.shard_id, key_dict: self.key_dict.clone(), - data_parts: DataParts::new( - DataBuffer::with_capacity(metadata, DATA_INIT_CAP), - Vec::new(), - ), + data_parts: DataParts::new(metadata, DATA_INIT_CAP), } } } @@ -147,7 +144,7 @@ mod tests { } let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts::new(DataBuffer::with_capacity(metadata, DATA_INIT_CAP), vec![]); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP); Shard::new(shard_id, Some(Arc::new(dict)), data_parts) } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 9bf2c27ab5b3..f9a32a17a563 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -87,10 +87,7 @@ impl ShardBuilder { }; // build data parts. - let data_parts = DataParts::new( - DataBuffer::with_capacity(metadata, DATA_INIT_CAP), - vec![data_part], - ); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP).with_frozen(vec![data_part]); let key_dict = key_dict.map(Arc::new); Ok(Some(Shard::new(shard_id, key_dict, data_parts))) @@ -168,7 +165,7 @@ mod tests { } let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts::new(DataBuffer::with_capacity(metadata, DATA_INIT_CAP), vec![]); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP); Shard::new(shard_id, Some(Arc::new(dict)), data_parts) }