diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index a2015c45d327..2a7799524342 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -459,4 +459,73 @@ mod tests { } assert_eq!(expect, v0_all); } + + #[test] + fn test_write_iter_multi_keys() { + write_iter_multi_keys(1, 100); + write_iter_multi_keys(2, 100); + write_iter_multi_keys(4, 100); + write_iter_multi_keys(8, 5); + write_iter_multi_keys(2, 10); + } + + fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) { + let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true); + let memtable = MergeTreeMemtable::new( + 1, + metadata.clone(), + None, + &MergeTreeConfig { + index_max_keys_per_shard: max_keys, + data_freeze_threshold: freeze_threshold, + ..Default::default() + }, + ); + + let mut data = Vec::new(); + // 4 partitions, each partition 4 pks. + for i in 0..4 { + for j in 0..4 { + // key: i, a{j} + let timestamps = [11, 13, 1, 5, 3, 7, 9]; + let key = format!("a{j}"); + let kvs = + memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0); + memtable.write(&kvs).unwrap(); + for ts in timestamps { + data.push((i, key.clone(), ts)); + } + } + for j in 0..4 { + // key: i, a{j} + let timestamps = [10, 2, 4, 8, 6]; + let key = format!("a{j}"); + let kvs = + memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200); + memtable.write(&kvs).unwrap(); + for ts in timestamps { + data.push((i, key.clone(), ts)); + } + } + } + data.sort_unstable(); + + let expect = data.into_iter().map(|x| x.2).collect::>(); + let iter = memtable.iter(None, None).unwrap(); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect::>(); + assert_eq!(expect, read); + } } diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 4be3bac5c3fa..126b8bdb13c8 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -108,7 +108,7 @@ impl KeyDictBuilder { } /// Finishes the builder. - pub fn finish(&mut self) -> Option { + pub fn finish(&mut self, pk_to_index: &mut BTreeMap, PkIndex>) -> Option { if self.key_buffer.is_empty() { return None; } @@ -116,11 +116,9 @@ impl KeyDictBuilder { // Finishes current dict block and resets the pk index. let dict_block = self.key_buffer.finish(true); self.dict_blocks.push(dict_block); - // Takes the pk to index map. - let mut pk_to_index = std::mem::take(&mut self.pk_to_index); // Computes key position and then alter pk index. - let mut key_positions = vec![0; pk_to_index.len()]; - for (i, pk_index) in pk_to_index.values_mut().enumerate() { + let mut key_positions = vec![0; self.pk_to_index.len()]; + for (i, pk_index) in self.pk_to_index.values_mut().enumerate() { // The position of the i-th key is the old pk index. key_positions[i] = *pk_index; // Overwrites the pk index. @@ -129,9 +127,9 @@ impl KeyDictBuilder { self.num_keys = 0; let key_bytes_in_index = self.key_bytes_in_index; self.key_bytes_in_index = 0; + *pk_to_index = std::mem::take(&mut self.pk_to_index); Some(KeyDict { - pk_to_index, dict_blocks: std::mem::take(&mut self.dict_blocks), key_positions, key_bytes_in_index, @@ -214,8 +212,6 @@ fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec) #[derive(Default)] pub struct KeyDict { // TODO(yingwen): We can use key_positions to do a binary search. - /// Key map to find a key in the dict. - pk_to_index: PkIndexMap, /// Unsorted key blocks. dict_blocks: Vec, /// Maps pk index to position of the key in [Self::dict_blocks]. @@ -237,11 +233,6 @@ impl KeyDict { self.dict_blocks[block_index as usize].key_by_pk_index(position) } - /// Gets the pk index by the key. - pub fn get_pk_index(&self, key: &[u8]) -> Option { - self.pk_to_index.get(key).copied() - } - /// Returns pk weights to sort a data part and replaces pk indices. pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { let mut pk_weights = Vec::with_capacity(self.key_positions.len()); @@ -442,7 +433,7 @@ mod tests { assert_eq!(key_bytes, builder.key_bytes_in_index); assert_eq!(8850, builder.memory_size()); - let dict = builder.finish().unwrap(); + let dict = builder.finish(&mut BTreeMap::new()).unwrap(); assert_eq!(0, builder.key_bytes_in_index); assert_eq!(key_bytes, dict.key_bytes_in_index); assert!(dict.shared_memory_size() > key_bytes); @@ -458,7 +449,7 @@ mod tests { builder.insert_key(key.as_bytes(), &mut metrics); } assert!(builder.is_full()); - builder.finish(); + builder.finish(&mut BTreeMap::new()); assert!(!builder.is_full()); assert_eq!(0, builder.insert_key(b"a0", &mut metrics)); diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index b147045256c3..e32381c4f5a7 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -16,7 +16,7 @@ //! //! We only support partitioning the tree by pre-defined internal columns. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use api::v1::SemanticType; @@ -67,17 +67,18 @@ impl Partition { metrics: &mut WriteMetrics, ) -> Result<()> { let mut inner = self.inner.write().unwrap(); - // Now we ensure one key only exists in one shard. + // Freeze the shard builder if needed. + if inner.shard_builder.should_freeze() { + inner.freeze_active_shard()?; + } + + // Finds key in shards, now we ensure one key only exists in one shard. if let Some(pk_id) = inner.find_key_in_shards(primary_key) { // Key already in shards. inner.write_to_shard(pk_id, key_value); return Ok(()); } - if inner.shard_builder.should_freeze() { - inner.freeze_active_shard()?; - } - // Write to the shard builder. inner .shard_builder @@ -142,19 +143,26 @@ impl Partition { /// /// Must freeze the partition before fork. pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition { - let inner = self.inner.read().unwrap(); - debug_assert!(inner.shard_builder.is_empty()); - // TODO(yingwen): TTL or evict shards. - let shard_builder = ShardBuilder::new( - metadata.clone(), - config, - inner.shard_builder.current_shard_id(), - ); - let shards = inner - .shards - .iter() - .map(|shard| shard.fork(metadata.clone())) - .collect(); + let (shards, shard_builder) = { + let inner = self.inner.read().unwrap(); + debug_assert!(inner.shard_builder.is_empty()); + let shard_builder = ShardBuilder::new( + metadata.clone(), + config, + inner.shard_builder.current_shard_id(), + ); + let shards = inner + .shards + .iter() + .map(|shard| shard.fork(metadata.clone())) + .collect(); + + (shards, shard_builder) + }; + let pk_to_pk_id = { + let mut inner = self.inner.write().unwrap(); + std::mem::take(&mut inner.pk_to_pk_id) + }; Partition { inner: RwLock::new(Inner { @@ -162,6 +170,8 @@ impl Partition { shard_builder, shards, num_rows: 0, + pk_to_pk_id, + frozen: false, }), dedup: self.dedup, } @@ -461,11 +471,14 @@ fn data_batch_to_batch( /// A key only exists in one shard. struct Inner { metadata: RegionMetadataRef, + /// Map to index pk to pk id. + pk_to_pk_id: HashMap, PkId>, /// Shard whose dictionary is active. shard_builder: ShardBuilder, /// Shards with frozen dictionary. shards: Vec, num_rows: usize, + frozen: bool, } impl Inner { @@ -479,20 +492,17 @@ impl Inner { let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id); Self { metadata, + pk_to_pk_id: HashMap::new(), shard_builder, shards, num_rows: 0, + frozen: false, } } fn find_key_in_shards(&self, primary_key: &[u8]) -> Option { - for shard in &self.shards { - if let Some(pkid) = shard.find_id_by_key(primary_key) { - return Some(pkid); - } - } - - None + assert!(!self.frozen); + self.pk_to_pk_id.get(primary_key).copied() } fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) { @@ -506,7 +516,10 @@ impl Inner { } fn freeze_active_shard(&mut self) -> Result<()> { - if let Some(shard) = self.shard_builder.finish(self.metadata.clone())? { + if let Some(shard) = self + .shard_builder + .finish(self.metadata.clone(), &mut self.pk_to_pk_id)? + { self.shards.push(shard); } Ok(()) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 9f975d7002f4..9d832e9ada03 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -52,17 +52,6 @@ impl Shard { } } - /// Returns the pk id of the key if it exists. - pub fn find_id_by_key(&self, key: &[u8]) -> Option { - let key_dict = self.key_dict.as_ref()?; - let pk_index = key_dict.get_pk_index(key)?; - - Some(PkId { - shard_id: self.shard_id, - pk_index, - }) - } - /// Writes a key value into the shard. pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: KeyValue) { debug_assert_eq!(self.shard_id, pk_id.shard_id); @@ -319,43 +308,54 @@ impl Node for ShardNode { #[cfg(test)] mod tests { + use std::collections::BTreeMap; use std::sync::Arc; use super::*; + use crate::memtable::merge_tree::data::timestamp_array_to_i64_slice; use crate::memtable::merge_tree::dict::KeyDictBuilder; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::PkIndex; use crate::memtable::KeyValues; use crate::test_util::memtable_util::{ - build_key_values_with_ts_seq_values, encode_key, encode_key_by_kv, encode_keys, - metadata_for_test, + build_key_values_with_ts_seq_values, encode_keys, metadata_for_test, }; - fn input_with_key(metadata: &RegionMetadataRef) -> Vec { + /// Returns key values and expect pk index. + fn input_with_key(metadata: &RegionMetadataRef) -> Vec<(KeyValues, PkIndex)> { vec![ - build_key_values_with_ts_seq_values( - metadata, - "shard".to_string(), + ( + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 2, + [20, 21].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 0, + ), 2, - [20, 21].into_iter(), - [Some(0.0), Some(1.0)].into_iter(), - 0, ), - build_key_values_with_ts_seq_values( - metadata, - "shard".to_string(), + ( + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 0, + [0, 1].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 1, + ), 0, - [0, 1].into_iter(), - [Some(0.0), Some(1.0)].into_iter(), - 1, ), - build_key_values_with_ts_seq_values( - metadata, - "shard".to_string(), + ( + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 1, + [10, 11].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 2, + ), 1, - [10, 11].into_iter(), - [Some(0.0), Some(1.0)].into_iter(), - 2, ), ] } @@ -363,55 +363,51 @@ mod tests { fn new_shard_with_dict( shard_id: ShardId, metadata: RegionMetadataRef, - input: &[KeyValues], + input: &[(KeyValues, PkIndex)], ) -> Shard { let mut dict_builder = KeyDictBuilder::new(1024); let mut metrics = WriteMetrics::default(); let mut keys = Vec::with_capacity(input.len()); - for kvs in input { + for (kvs, _) in input { encode_keys(&metadata, kvs, &mut keys); } for key in &keys { dict_builder.insert_key(key, &mut metrics); } - let dict = dict_builder.finish().unwrap(); + let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap(); let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true) } #[test] - fn test_shard_find_by_key() { - let metadata = metadata_for_test(); - let input = input_with_key(&metadata); - let shard = new_shard_with_dict(8, metadata, &input); - for i in 0..input.len() { - let key = encode_key("shard", i as u32); - assert_eq!( - PkId { - shard_id: 8, - pk_index: i as PkIndex, - }, - shard.find_id_by_key(&key).unwrap() - ); - } - assert!(shard.find_id_by_key(&encode_key("shard", 100)).is_none()); - } - - #[test] - fn test_write_shard() { + fn test_write_read_shard() { let metadata = metadata_for_test(); let input = input_with_key(&metadata); let mut shard = new_shard_with_dict(8, metadata, &input); assert!(shard.is_empty()); - for key_values in &input { + for (key_values, pk_index) in &input { for kv in key_values.iter() { - let key = encode_key_by_kv(&kv); - let pk_id = shard.find_id_by_key(&key).unwrap(); + let pk_id = PkId { + shard_id: shard.shard_id, + pk_index: *pk_index, + }; shard.write_with_pk_id(pk_id, kv); } } assert!(!shard.is_empty()); + + let mut reader = shard.read().unwrap(); + let mut timestamps = Vec::new(); + while reader.is_valid() { + let rb = reader.current_data_batch().slice_record_batch(); + let ts_array = rb.column(1); + let ts_slice = timestamp_array_to_i64_slice(ts_array); + timestamps.extend_from_slice(ts_slice); + + reader.next().unwrap(); + } + assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps); } } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index c63a1b9f261c..f20a38fe672d 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -14,6 +14,7 @@ //! Builder of a shard. +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use store_api::metadata::RegionMetadataRef; @@ -78,14 +79,31 @@ impl ShardBuilder { /// Builds a new shard and resets the builder. /// /// Returns `None` if the builder is empty. - pub fn finish(&mut self, metadata: RegionMetadataRef) -> Result> { + pub fn finish( + &mut self, + metadata: RegionMetadataRef, + pk_to_pk_id: &mut HashMap, PkId>, + ) -> Result> { if self.is_empty() { return Ok(None); } - let key_dict = self.dict_builder.finish(); + let mut pk_to_index = BTreeMap::new(); + let key_dict = self.dict_builder.finish(&mut pk_to_index); let data_part = match &key_dict { Some(dict) => { + // Adds mapping to the map. + pk_to_pk_id.reserve(pk_to_index.len()); + for (k, pk_index) in pk_to_index { + pk_to_pk_id.insert( + k, + PkId { + shard_id: self.current_shard_id, + pk_index, + }, + ); + } + let pk_weights = dict.pk_weights_to_sort_data(); self.data_buffer.freeze(Some(&pk_weights), true)? } @@ -162,6 +180,7 @@ impl ShardBuilderReader { mod tests { use super::*; + use crate::memtable::merge_tree::data::timestamp_array_to_i64_slice; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::KeyValues; use crate::test_util::memtable_util::{ @@ -173,24 +192,24 @@ mod tests { build_key_values_with_ts_seq_values( metadata, "shard_builder".to_string(), - 3, - [30, 31].into_iter(), + 2, + [20, 21].into_iter(), [Some(0.0), Some(1.0)].into_iter(), 0, ), build_key_values_with_ts_seq_values( metadata, "shard_builder".to_string(), - 1, - [10, 11].into_iter(), + 0, + [0, 1].into_iter(), [Some(0.0), Some(1.0)].into_iter(), 1, ), build_key_values_with_ts_seq_values( metadata, "shard_builder".to_string(), - 2, - [20, 21].into_iter(), + 1, + [10, 11].into_iter(), [Some(0.0), Some(1.0)].into_iter(), 2, ), @@ -204,7 +223,10 @@ mod tests { let config = MergeTreeConfig::default(); let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1); let mut metrics = WriteMetrics::default(); - assert!(shard_builder.finish(metadata.clone()).unwrap().is_none()); + assert!(shard_builder + .finish(metadata.clone(), &mut HashMap::new()) + .unwrap() + .is_none()); assert_eq!(1, shard_builder.current_shard_id); for key_values in &input { @@ -213,8 +235,41 @@ mod tests { shard_builder.write_with_key(&key, kv, &mut metrics); } } - let shard = shard_builder.finish(metadata).unwrap().unwrap(); + let shard = shard_builder + .finish(metadata, &mut HashMap::new()) + .unwrap() + .unwrap(); assert_eq!(1, shard.shard_id); assert_eq!(2, shard_builder.current_shard_id); } + + #[test] + fn test_write_read_shard_builder() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let config = MergeTreeConfig::default(); + let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1); + let mut metrics = WriteMetrics::default(); + + for key_values in &input { + for kv in key_values.iter() { + let key = encode_key_by_kv(&kv); + shard_builder.write_with_key(&key, kv, &mut metrics); + } + } + + let mut pk_weights = Vec::new(); + let mut reader = shard_builder.read(&mut pk_weights).unwrap(); + let mut timestamps = Vec::new(); + while reader.is_valid() { + let rb = reader.current_data_batch().slice_record_batch(); + let ts_array = rb.column(1); + let ts_slice = timestamp_array_to_i64_slice(ts_array); + timestamps.extend_from_slice(ts_slice); + + reader.next().unwrap(); + } + assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps); + assert_eq!(vec![2, 0, 1], pk_weights); + } } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 524b3814866e..002f5d23bd03 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -22,7 +22,6 @@ use api::v1::{Row, Rows, SemanticType}; use datatypes::arrow::array::UInt64Array; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; -use datatypes::value::ValueRef; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; @@ -291,16 +290,6 @@ pub(crate) fn encode_keys( } } -/// Encode one key. -pub(crate) fn encode_key(k0: &str, k1: u32) -> Vec { - let row_codec = McmpRowCodec::new(vec![ - SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::uint32_datatype()), - ]); - let key = [ValueRef::String(k0), ValueRef::UInt32(k1)]; - row_codec.encode(key.into_iter()).unwrap() -} - /// Encode one key. pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { let row_codec = McmpRowCodec::new(vec![