Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement write and fork for the new memtable #3357

Merged
merged 14 commits into from
Feb 23, 2024
82 changes: 82 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,85 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
))
}
}

#[cfg(test)]
mod tests {
use common_time::Timestamp;

use super::*;
use crate::test_util::memtable_util;

#[test]
fn test_memtable_sorted_input() {
write_sorted_input(true);
write_sorted_input(false);
}

fn write_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
let timestamps = (0..100).collect::<Vec<_>>();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(99)
)),
stats.time_range()
);
}

#[test]
fn test_memtable_unsorted_input() {
write_iter_unsorted_input(true);
write_iter_unsorted_input(false);
}

fn write_iter_unsorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
let memtable =
MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());

let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[1, 3, 7, 5, 6],
0, // sequence 0, 1, 2, 3, 4
);
memtable.write(&kvs).unwrap();

let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[5, 2, 4, 0, 7],
5, // sequence 5, 6, 7, 8, 9
);
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
stats.time_range()
);
}
}
32 changes: 17 additions & 15 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger};
use crate::memtable::merge_tree::{PkId, PkIndex};
use crate::memtable::merge_tree::PkIndex;

const PK_INDEX_COLUMN_NAME: &str = "__pk_index";

/// Initial capacity for the data buffer.
pub(crate) const DATA_INIT_CAP: usize = 8;

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone)]
pub struct DataBatch {
Expand Down Expand Up @@ -128,9 +131,9 @@ impl DataBuffer {
}

/// Writes a row to data buffer.
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) {
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
self.ts_builder.push_value_ref(kv.timestamp());
self.pk_index_builder.push(Some(pk_id.pk_index));
self.pk_index_builder.push(Some(pk_index));
self.sequence_builder.push(Some(kv.sequence()));
self.op_type_builder.push(Some(kv.op_type() as u8));

Expand Down Expand Up @@ -662,9 +665,9 @@ pub struct ParquetPart {
/// Data parts under a shard.
pub struct DataParts {
/// The active writing buffer.
pub(crate) active: DataBuffer,
active: DataBuffer,
/// immutable (encoded) parts.
pub(crate) frozen: Vec<DataPart>,
frozen: Vec<DataPart>,
}

impl DataParts {
Expand All @@ -675,9 +678,14 @@ impl DataParts {
}
}

/// Writes one row into active part.
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) {
self.active.write_row(pk_id, kv)
pub(crate) fn with_frozen(mut self, frozen: Vec<DataPart>) -> Self {
self.frozen = frozen;
self
}

/// Writes a row into parts.
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
self.active.write_row(pk_index, kv)
}

/// Freezes the active data buffer into frozen data parts.
Expand Down Expand Up @@ -932,13 +940,7 @@ mod tests {
);

for kv in kvs.iter() {
buffer.write_row(
PkId {
shard_id: 0,
pk_index,
},
kv,
);
buffer.write_row(pk_index, kv);
}
}

Expand Down
99 changes: 62 additions & 37 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,21 @@ impl KeyDictBuilder {
self.pk_to_index.get(key).copied()
}

/// Returns true if the builder is full.
pub fn is_full(&self) -> bool {
self.num_keys >= self.capacity
}

/// Adds the key to the builder and returns its index if the builder is not full.
///
/// Returns `None` if the builder is full.
pub fn try_insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> Option<PkIndex> {
/// # Panics
/// Panics if the builder is full.
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
assert!(!self.is_full());

if let Some(pk_index) = self.pk_to_index.get(key).copied() {
// Already in the builder.
return Some(pk_index);
}

// A new key.
if self.num_keys >= self.capacity {
// The builder is full.
return None;
return pk_index;
}

if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
Expand All @@ -91,7 +93,7 @@ impl KeyDictBuilder {
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();

Some(pk_index)
pk_index
}

/// Memory size of the builder.
Expand Down Expand Up @@ -129,11 +131,12 @@ impl KeyDictBuilder {
pk_to_index,
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index: self.key_bytes_in_index,
})
}

/// Scans the builder.
pub fn scan(&self) -> DictBuilderReader {
/// Reads the builder.
pub fn read(&self) -> DictBuilderReader {
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
let block = self.key_buffer.finish_cloned();
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
Expand Down Expand Up @@ -162,38 +165,46 @@ impl DictBuilderReader {
}
}

/// Returns true if the item in the reader is valid.
pub fn is_valid(&self) -> bool {
self.offset < self.sorted_pk_indices.len()
/// Returns the number of keys.
pub fn num_keys(&self) -> usize {
self.sorted_pk_indices.len()
}

/// Gets the i-th pk index.
pub fn pk_index(&self, offset: usize) -> PkIndex {
self.sorted_pk_indices[offset]
}

/// Returns current key.
pub fn current_key(&self) -> &[u8] {
let pk_index = self.current_pk_index();
/// Gets the i-th key.
pub fn key(&self, offset: usize) -> &[u8] {
let pk_index = self.pk_index(offset);
self.key_by_pk_index(pk_index)
}

/// Returns current [PkIndex] of the key.
pub fn current_pk_index(&self) -> PkIndex {
assert!(self.is_valid());
self.sorted_pk_indices[self.offset]
/// Gets the key by the pk index.
pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
self.blocks[block_idx as usize].key_by_pk_index(pk_index)
}

/// Advances the reader.
pub fn next(&mut self) {
assert!(self.is_valid());
self.offset += 1;
/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.sorted_pk_indices)
}

/// Returns pk indices sorted by keys.
pub(crate) fn sorted_pk_index(&self) -> &[PkIndex] {
&self.sorted_pk_indices
}
}

fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
self.blocks[block_idx as usize].key_by_pk_index(pk_index)
/// Returns pk weights to sort a data part and replaces pk indices.
fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec<u16> {
let mut pk_weights = vec![0; sorted_pk_indices.len()];
for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
pk_weights[*pk_index as usize] = weight as u16;
}
pk_weights
}

/// A key dictionary.
Expand All @@ -206,6 +217,7 @@ pub struct KeyDict {
dict_blocks: Vec<DictBlock>,
/// Maps pk index to position of the key in [Self::dict_blocks].
key_positions: Vec<PkIndex>,
key_bytes_in_index: usize,
}

pub type KeyDictRef = Arc<KeyDict>;
Expand All @@ -220,6 +232,21 @@ impl KeyDict {
let block_index = position / MAX_KEYS_PER_BLOCK;
self.dict_blocks[block_index as usize].key_by_pk_index(position)
}

/// Gets the pk index by the key.
pub fn get_pk_index(&self, key: &[u8]) -> Option<PkIndex> {
self.pk_to_index.get(key).copied()
}

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.key_positions)
}

/// Returns the shared memory size.
pub(crate) fn shared_memory_size(&self) -> usize {
self.key_bytes_in_index
}
}

/// Buffer to store unsorted primary keys.
Expand Down Expand Up @@ -364,7 +391,8 @@ mod tests {
let mut last_pk_index = None;
let mut metrics = WriteMetrics::default();
for key in &keys {
let pk_index = builder.try_insert_key(key, &mut metrics).unwrap();
assert!(!builder.is_full());
let pk_index = builder.insert_key(key, &mut metrics);
last_pk_index = Some(pk_index);
}
assert_eq!(num_keys - 1, last_pk_index.unwrap());
Expand All @@ -379,10 +407,9 @@ mod tests {
expect.sort_unstable_by(|a, b| a.0.cmp(&b.0));

let mut result = Vec::with_capacity(expect.len());
let mut reader = builder.scan();
while reader.is_valid() {
result.push((reader.current_key().to_vec(), reader.current_pk_index()));
reader.next();
let reader = builder.read();
for i in 0..reader.num_keys() {
result.push((reader.key(i).to_vec(), reader.pk_index(i)));
}
assert_eq!(expect, result);
}
Expand All @@ -397,9 +424,7 @@ mod tests {
for i in 0..num_keys {
// Each key is 5 bytes.
let key = format!("{i:05}");
builder
.try_insert_key(key.as_bytes(), &mut metrics)
.unwrap();
builder.insert_key(key.as_bytes(), &mut metrics);
}
// num_keys * 5 * 2
assert_eq!(5130, metrics.key_bytes);
Expand Down
9 changes: 1 addition & 8 deletions src/mito2/src/memtable/merge_tree/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ mod tests {

use super::*;
use crate::memtable::merge_tree::data::DataBuffer;
use crate::memtable::merge_tree::PkId;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};

fn write_rows_to_buffer(
Expand All @@ -429,13 +428,7 @@ mod tests {
);

for kv in kvs.iter() {
buffer.write_row(
PkId {
shard_id: 0,
pk_index,
},
kv,
);
buffer.write_row(pk_index, kv);
}

*sequence += rows;
Expand Down
Loading