Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Defines more structs and methods for a partitioned merge tree #3348

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ impl Memtable for MergeTreeMemtable {
// TODO(yingwen): Validate schema while inserting rows.

let mut metrics = WriteMetrics::default();
let res = self.tree.write(kvs, &mut metrics);
let mut pk_buffer = Vec::new();
let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);

self.update_stats(&metrics);

Expand Down
146 changes: 144 additions & 2 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@
//!
//! We only support partitioning the tree by pre-defined internal columns.

use std::collections::HashSet;
use std::sync::{Arc, RwLock};

use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::shard_builder::ShardBuilder;
use crate::memtable::merge_tree::ShardId;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId};

/// Key of a partition.
pub type PartitionKey = u32;
Expand All @@ -30,13 +39,146 @@ pub struct Partition {
inner: RwLock<Inner>,
}

impl Partition {
/// Creates a new partition.
pub fn new(_metadata: RegionMetadataRef, _config: &MergeTreeConfig) -> Self {
unimplemented!()
}

/// Writes to the partition with a primary key.
pub fn write_with_key(
&self,
primary_key: &[u8],
key_value: KeyValue,
metrics: &mut WriteMetrics,
) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// Now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
// Key already in shards.
return inner.write_to_shard(pk_id, key_value);
}

if inner.shard_builder.should_freeze() {
let shard_id = inner.active_shard_id;
let shard = inner.shard_builder.finish(shard_id)?;
inner.active_shard_id += 1;
inner.shards.push(shard);
}

// Write to the shard builder.
inner
.shard_builder
.write_with_key(primary_key, key_value, metrics)?;

Ok(())
}

/// Writes to the partition without a primary key.
pub fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// If no primary key, always write to the first shard.
if inner.shards.is_empty() {
let shard_id = inner.active_shard_id;
inner.shards.push(Shard::new_no_dict(shard_id));
inner.active_shard_id += 1;
}

// A dummy pk id.
let pk_id = PkId {
shard_id: inner.active_shard_id - 1,
pk_index: 0,
};
inner.shards[0].write_key_value(pk_id, key_value, metrics)
}

/// Scans data in the partition.
pub fn scan(
&self,
_projection: HashSet<ColumnId>,
_filters: Vec<SimpleFilterEvaluator>,
) -> Result<PartitionReader> {
unimplemented!()
}

/// Freezes the partition.
pub fn freeze(&self) -> Result<()> {
unimplemented!()
}

/// Forks the partition.
pub fn fork(&self, _metadata: &RegionMetadataRef) -> Partition {
unimplemented!()
}

/// Returns true if the partition has data.
pub fn has_data(&self) -> bool {
unimplemented!()
}

/// Returns shared memory size of the partition.
pub fn shared_memory_size(&self) -> usize {
unimplemented!()
}

/// Get partition key from the key value.
pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
if !is_partitioned {
return PartitionKey::default();
}

let Some(value) = key_value.primary_keys().next() else {
return PartitionKey::default();
};

value.as_u32().unwrap().unwrap()
}

/// Returns true if the region can be partitioned.
pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
metadata
.primary_key_columns()
.next()
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.unwrap_or(false)
}

/// Returns true if this is a partition column.
pub(crate) fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
}

/// Reader to scan rows in a partition.
///
/// It can merge rows from multiple shards.
pub struct PartitionReader {}

pub type PartitionRef = Arc<Partition>;

/// Inner struct of the partition.
///
/// A key only exists in one shard.
struct Inner {
/// Shard whose dictionary is active.
shard_builder: ShardBuilder,
next_shard_id: ShardId,
active_shard_id: ShardId,
/// Shards with frozon dictionary.
shards: Vec<Shard>,
}

impl Inner {
fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
for shard in &self.shards {
if let Some(pkid) = shard.find_key(primary_key) {
return Some(pkid);
}
}

None
}

fn write_to_shard(&mut self, _pk_id: PkId, _key_value: KeyValue) -> Result<()> {
unimplemented!()
}
}
44 changes: 43 additions & 1 deletion src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,17 @@

//! Shard in a partition.

use std::collections::HashSet;

use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::DataParts;
use crate::memtable::merge_tree::dict::KeyDictRef;
use crate::memtable::merge_tree::ShardId;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::{PkId, ShardId};

/// Shard stores data related to the same key dictionary.
pub struct Shard {
Expand All @@ -26,3 +34,37 @@ pub struct Shard {
/// Data in the shard.
data_parts: DataParts,
}

impl Shard {
/// Returns a shard without dictionary.
pub fn new_no_dict(_shard_id: ShardId) -> Shard {
unimplemented!()
}

/// Returns the pk id of the key if it exists.
pub fn find_key(&self, _key: &[u8]) -> Option<PkId> {
unimplemented!()
}

/// Writes a key value into the shard.
pub fn write_key_value(
&mut self,
_pk_id: PkId,
_key_value: KeyValue,
_metrics: &mut WriteMetrics,
) -> Result<()> {
unimplemented!()
}

/// Scans the shard.
pub fn scan(
&self,
_projection: &HashSet<ColumnId>,
_filters: &[SimpleFilterEvaluator],
) -> ShardReader {
unimplemented!()
}
}

/// Reader to read rows in a shard.
pub struct ShardReader {}
44 changes: 44 additions & 0 deletions src/mito2/src/memtable/merge_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@

//! Builder of a shard.

use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::DataBuffer;
use crate::memtable::merge_tree::dict::KeyDictBuilder;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::ShardId;

/// Builder to write keys and data to a shard that the key dictionary
/// is still active.
Expand All @@ -24,4 +29,43 @@ pub struct ShardBuilder {
dict_builder: KeyDictBuilder,
/// Buffer to store data.
data_buffer: DataBuffer,
/// Max keys in an index shard.
index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
data_freeze_threshold: usize,
}

impl ShardBuilder {
/// Write a key value with its encoded primary key.
pub fn write_with_key(
&mut self,
_key: &[u8],
_key_value: KeyValue,
_metrics: &mut WriteMetrics,
) -> Result<()> {
unimplemented!()
}

/// Returns true if the builder is empty.
pub fn is_empty(&self) -> bool {
unimplemented!()
}

/// Returns true if the builder need to freeze.
pub fn should_freeze(&self) -> bool {
unimplemented!()
}

/// Builds a new shard and resets the builder.
pub fn finish(&mut self, _shard_id: ShardId) -> Result<Shard> {
unimplemented!()
}

/// Scans the shard builder
pub fn scan(&mut self, _shard_id: ShardId) -> Result<ShardBuilderReader> {
unimplemented!()
}
}

/// Reader to scan a shard. builder.
pub struct ShardBuilderReader {}
Loading