Skip to content

Commit

Permalink
fix: label mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed May 13, 2024
1 parent 9d12496 commit bcfd5ef
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 44 deletions.
73 changes: 47 additions & 26 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::metrics::MEMTABLE_DICT_BYTES;
/// Maximum keys in a [DictBlock].
const MAX_KEYS_PER_BLOCK: u16 = 256;

type PkIndexMap = BTreeMap<Vec<u8>, PkIndex>;
type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;

/// Builder to build a key dictionary.
pub struct KeyDictBuilder {
Expand Down Expand Up @@ -66,10 +66,15 @@ impl KeyDictBuilder {
///
/// # Panics
/// Panics if the builder is full.
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
pub fn insert_key(
&mut self,
full_primary_key: &[u8],
sparse_key: Option<&[u8]>,
metrics: &mut WriteMetrics,
) -> PkIndex {
assert!(!self.is_full());

if let Some(pk_index) = self.pk_to_index.get(key).copied() {
if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
// Already in the builder.
return pk_index;
}
Expand All @@ -81,16 +86,22 @@ impl KeyDictBuilder {
}

// Safety: we have checked the buffer length.
let pk_index = self.key_buffer.push_key(key);
self.pk_to_index.insert(key.to_vec(), pk_index);
let pk_index = self.key_buffer.push_key(full_primary_key);
let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
(Some(sparse_key.to_vec()), sparse_key.len())
} else {
(None, 0)
};
self.pk_to_index
.insert(full_primary_key.to_vec(), (pk_index, sparse_key));
self.num_keys += 1;

// Since we store the key twice so the bytes usage doubled.
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();
metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
self.key_bytes_in_index += full_primary_key.len();

// Adds key size of index to the metrics.
MEMTABLE_DICT_BYTES.add(key.len() as i64);
MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);

pk_index
}
Expand All @@ -108,37 +119,47 @@ impl KeyDictBuilder {
}

/// Finishes the builder.
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
if self.key_buffer.is_empty() {
return None;
}
let mut pk_to_index_map = BTreeMap::new();

// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Computes key position and then alter pk index.
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {

for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
.into_iter()
.enumerate()
{
// The position of the i-th key is the old pk index.
key_positions[i] = *pk_index;
// Overwrites the pk index.
*pk_index = i as PkIndex;
key_positions[i] = pk_index;
pk_to_index_map.insert(full_pk, i as PkIndex);
if let Some(sparse_key) = sparse_key {
pk_to_index_map.insert(sparse_key, i as PkIndex);
}
}

self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;
*pk_to_index = std::mem::take(&mut self.pk_to_index);

Some(KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
})
Some((
KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
},
pk_to_index_map,
))
}

/// Reads the builder.
pub fn read(&self) -> DictBuilderReader {
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
let block = self.key_buffer.finish_cloned();
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
blocks.extend_from_slice(&self.dict_blocks);
Expand Down Expand Up @@ -394,7 +415,7 @@ mod tests {
let mut metrics = WriteMetrics::default();
for key in &keys {
assert!(!builder.is_full());
let pk_index = builder.insert_key(key, &mut metrics);
let pk_index = builder.insert_key(key, None, &mut metrics);
last_pk_index = Some(pk_index);
}
assert_eq!(num_keys - 1, last_pk_index.unwrap());
Expand Down Expand Up @@ -426,14 +447,14 @@ mod tests {
for i in 0..num_keys {
// Each key is 5 bytes.
let key = format!("{i:05}");
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
let key_bytes = num_keys as usize * 5;
assert_eq!(key_bytes * 2, metrics.key_bytes);
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = builder.finish().unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
Expand All @@ -446,12 +467,12 @@ mod tests {
for i in 0..MAX_KEYS_PER_BLOCK * 2 {
let key = format!("{i:010}");
assert!(!builder.is_full());
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
assert!(builder.is_full());
builder.finish(&mut BTreeMap::new());
builder.finish();

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
}
}
11 changes: 7 additions & 4 deletions src/mito2/src/memtable/partition_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ impl Partition {
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
} else {
// `primary_key` is already the full primary key.
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
.write_with_key(primary_key, None, &key_value, metrics);
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
};

Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/memtable/partition_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ impl Node for ShardNode {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;

use super::*;
Expand Down Expand Up @@ -488,10 +487,10 @@ mod tests {
encode_keys(&metadata, kvs, &mut keys);
}
for key in &keys {
dict_builder.insert_key(key, &mut metrics);
dict_builder.insert_key(key, None, &mut metrics);
}

let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(
Expand Down
24 changes: 13 additions & 11 deletions src/mito2/src/memtable/partition_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Builder of a shard.
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -71,12 +71,15 @@ impl ShardBuilder {
/// Write a key value with its encoded primary key.
pub fn write_with_key(
&mut self,
primary_key: &[u8],
full_primary_key: &[u8],
sparse_key: Option<&[u8]>,
key_value: &KeyValue,
metrics: &mut WriteMetrics,
) -> PkId {
// Safety: we check whether the builder need to freeze before.
let pk_index = self.dict_builder.insert_key(primary_key, metrics);
let pk_index = self
.dict_builder
.insert_key(full_primary_key, sparse_key, metrics);
self.data_buffer.write_row(pk_index, key_value);
PkId {
shard_id: self.current_shard_id,
Expand Down Expand Up @@ -106,10 +109,8 @@ impl ShardBuilder {
return Ok(None);
}

let mut pk_to_index = BTreeMap::new();
let key_dict = self.dict_builder.finish(&mut pk_to_index);
let data_part = match &key_dict {
Some(dict) => {
let (data_part, key_dict) = match self.dict_builder.finish() {
Some((dict, pk_to_index)) => {
// Adds mapping to the map.
pk_to_pk_id.reserve(pk_to_index.len());
for (k, pk_index) in pk_to_index {
Expand All @@ -123,11 +124,12 @@ impl ShardBuilder {
}

let pk_weights = dict.pk_weights_to_sort_data();
self.data_buffer.freeze(Some(&pk_weights), true)?
let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
(part, Some(dict))
}
None => {
let pk_weights = [0];
self.data_buffer.freeze(Some(&pk_weights), true)?
(self.data_buffer.freeze(Some(&pk_weights), true)?, None)
}
};

Expand Down Expand Up @@ -367,7 +369,7 @@ mod tests {
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
shard_builder.write_with_key(&key, &kv, &mut metrics);
shard_builder.write_with_key(&key, None, &kv, &mut metrics);
}
}
let shard = shard_builder
Expand All @@ -389,7 +391,7 @@ mod tests {
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
shard_builder.write_with_key(&key, &kv, &mut metrics);
shard_builder.write_with_key(&key, None, &kv, &mut metrics);
}
}

Expand Down

0 comments on commit bcfd5ef

Please sign in to comment.