Skip to content

Commit

Permalink
feat: Use a partition level map to look up pk index (GreptimeTeam#3400)
Browse files Browse the repository at this point in the history
* feat: partition level map

* test: test shard and builder

* fix: do not use pk index from shard builder

* feat: add multi key test

* fix: freeze shard before finding pk in shards
  • Loading branch information
evenyag authored Feb 28, 2024
1 parent f3d69e9 commit b97f957
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 120 deletions.
69 changes: 69 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,4 +459,73 @@ mod tests {
}
assert_eq!(expect, v0_all);
}

#[test]
fn test_write_iter_multi_keys() {
write_iter_multi_keys(1, 100);
write_iter_multi_keys(2, 100);
write_iter_multi_keys(4, 100);
write_iter_multi_keys(8, 5);
write_iter_multi_keys(2, 10);
}

fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let memtable = MergeTreeMemtable::new(
1,
metadata.clone(),
None,
&MergeTreeConfig {
index_max_keys_per_shard: max_keys,
data_freeze_threshold: freeze_threshold,
..Default::default()
},
);

let mut data = Vec::new();
// 4 partitions, each partition 4 pks.
for i in 0..4 {
for j in 0..4 {
// key: i, a{j}
let timestamps = [11, 13, 1, 5, 3, 7, 9];
let key = format!("a{j}");
let kvs =
memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
memtable.write(&kvs).unwrap();
for ts in timestamps {
data.push((i, key.clone(), ts));
}
}
for j in 0..4 {
// key: i, a{j}
let timestamps = [10, 2, 4, 8, 6];
let key = format!("a{j}");
let kvs =
memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
memtable.write(&kvs).unwrap();
for ts in timestamps {
data.push((i, key.clone(), ts));
}
}
}
data.sort_unstable();

let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(expect, read);
}
}
21 changes: 6 additions & 15 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,17 @@ impl KeyDictBuilder {
}

/// Finishes the builder.
pub fn finish(&mut self) -> Option<KeyDict> {
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
if self.key_buffer.is_empty() {
return None;
}

// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Takes the pk to index map.
let mut pk_to_index = std::mem::take(&mut self.pk_to_index);
// Computes key position and then alter pk index.
let mut key_positions = vec![0; pk_to_index.len()];
for (i, pk_index) in pk_to_index.values_mut().enumerate() {
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {
// The position of the i-th key is the old pk index.
key_positions[i] = *pk_index;
// Overwrites the pk index.
Expand All @@ -129,9 +127,9 @@ impl KeyDictBuilder {
self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;
*pk_to_index = std::mem::take(&mut self.pk_to_index);

Some(KeyDict {
pk_to_index,
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
Expand Down Expand Up @@ -214,8 +212,6 @@ fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>)
#[derive(Default)]
pub struct KeyDict {
// TODO(yingwen): We can use key_positions to do a binary search.
/// Key map to find a key in the dict.
pk_to_index: PkIndexMap,
/// Unsorted key blocks.
dict_blocks: Vec<DictBlock>,
/// Maps pk index to position of the key in [Self::dict_blocks].
Expand All @@ -237,11 +233,6 @@ impl KeyDict {
self.dict_blocks[block_index as usize].key_by_pk_index(position)
}

/// Gets the pk index by the key.
pub fn get_pk_index(&self, key: &[u8]) -> Option<PkIndex> {
self.pk_to_index.get(key).copied()
}

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
let mut pk_weights = Vec::with_capacity(self.key_positions.len());
Expand Down Expand Up @@ -442,7 +433,7 @@ mod tests {
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish().unwrap();
let dict = builder.finish(&mut BTreeMap::new()).unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
Expand All @@ -458,7 +449,7 @@ mod tests {
builder.insert_key(key.as_bytes(), &mut metrics);
}
assert!(builder.is_full());
builder.finish();
builder.finish(&mut BTreeMap::new());

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
Expand Down
67 changes: 40 additions & 27 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//!
//! We only support partitioning the tree by pre-defined internal columns.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};

use api::v1::SemanticType;
Expand Down Expand Up @@ -67,17 +67,18 @@ impl Partition {
metrics: &mut WriteMetrics,
) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// Now we ensure one key only exists in one shard.
// Freeze the shard builder if needed.
if inner.shard_builder.should_freeze() {
inner.freeze_active_shard()?;
}

// Finds key in shards, now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
// Key already in shards.
inner.write_to_shard(pk_id, key_value);
return Ok(());
}

if inner.shard_builder.should_freeze() {
inner.freeze_active_shard()?;
}

// Write to the shard builder.
inner
.shard_builder
Expand Down Expand Up @@ -142,26 +143,35 @@ impl Partition {
///
/// Must freeze the partition before fork.
pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition {
let inner = self.inner.read().unwrap();
debug_assert!(inner.shard_builder.is_empty());
// TODO(yingwen): TTL or evict shards.
let shard_builder = ShardBuilder::new(
metadata.clone(),
config,
inner.shard_builder.current_shard_id(),
);
let shards = inner
.shards
.iter()
.map(|shard| shard.fork(metadata.clone()))
.collect();
let (shards, shard_builder) = {
let inner = self.inner.read().unwrap();
debug_assert!(inner.shard_builder.is_empty());
let shard_builder = ShardBuilder::new(
metadata.clone(),
config,
inner.shard_builder.current_shard_id(),
);
let shards = inner
.shards
.iter()
.map(|shard| shard.fork(metadata.clone()))
.collect();

(shards, shard_builder)
};
let pk_to_pk_id = {
let mut inner = self.inner.write().unwrap();
std::mem::take(&mut inner.pk_to_pk_id)
};

Partition {
inner: RwLock::new(Inner {
metadata: metadata.clone(),
shard_builder,
shards,
num_rows: 0,
pk_to_pk_id,
frozen: false,
}),
dedup: self.dedup,
}
Expand Down Expand Up @@ -461,11 +471,14 @@ fn data_batch_to_batch(
/// A key only exists in one shard.
struct Inner {
metadata: RegionMetadataRef,
/// Map to index pk to pk id.
pk_to_pk_id: HashMap<Vec<u8>, PkId>,
/// Shard whose dictionary is active.
shard_builder: ShardBuilder,
/// Shards with frozen dictionary.
shards: Vec<Shard>,
num_rows: usize,
frozen: bool,
}

impl Inner {
Expand All @@ -479,20 +492,17 @@ impl Inner {
let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
Self {
metadata,
pk_to_pk_id: HashMap::new(),
shard_builder,
shards,
num_rows: 0,
frozen: false,
}
}

fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
for shard in &self.shards {
if let Some(pkid) = shard.find_id_by_key(primary_key) {
return Some(pkid);
}
}

None
assert!(!self.frozen);
self.pk_to_pk_id.get(primary_key).copied()
}

fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) {
Expand All @@ -506,7 +516,10 @@ impl Inner {
}

fn freeze_active_shard(&mut self) -> Result<()> {
if let Some(shard) = self.shard_builder.finish(self.metadata.clone())? {
if let Some(shard) = self
.shard_builder
.finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
{
self.shards.push(shard);
}
Ok(())
Expand Down
Loading

0 comments on commit b97f957

Please sign in to comment.