Skip to content

Commit

Permalink
feat: impl merge reader for DataParts
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Feb 22, 2024
1 parent 7c88d72 commit 5ac6b35
Show file tree
Hide file tree
Showing 3 changed files with 699 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
mod data;
mod dict;
mod merger;
mod metrics;
mod partition;
mod shard;
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::memtable::{
type ShardId = u32;
/// Index of a primary key in a shard.
type PkIndex = u16;

/// Id of a primary key inside a tree.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct PkId {
Expand Down
107 changes: 90 additions & 17 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,39 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger};
use crate::memtable::merge_tree::{PkId, PkIndex};

const PK_INDEX_COLUMN_NAME: &str = "__pk_index";

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone)]
pub struct DataBatch<'a> {
pub struct DataBatch {
/// Primary key index of this batch.
pk_index: PkIndex,
pub(crate) pk_index: PkIndex,
/// Record batch of data.
rb: &'a RecordBatch,
pub(crate) rb: RecordBatch,
/// Range of current primary key inside record batch
range: Range<usize>,
pub(crate) range: Range<usize>,
}

impl<'a> DataBatch<'a> {
impl DataBatch {
pub(crate) fn pk_index(&self) -> PkIndex {
self.pk_index
}

pub(crate) fn record_batch(&self) -> &RecordBatch {
self.rb
&self.rb
}

pub(crate) fn range(&self) -> Range<usize> {
self.range.clone()
}

pub(crate) fn is_empty(&self) -> bool {
self.range.is_empty()
}

pub(crate) fn slice_record_batch(&self) -> RecordBatch {
self.rb.slice(self.range.start, self.range.len())
}
Expand Down Expand Up @@ -314,10 +319,12 @@ impl DataBufferReader {
/// If Current reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let (pk_index, range) = self.current_batch.as_ref().unwrap();
let rb = self.batch.slice(range.start, range.len());
let range = 0..rb.num_rows();
DataBatch {
pk_index: *pk_index,
rb: &self.batch,
range: range.clone(),
rb,
range,
}
}

Expand Down Expand Up @@ -528,14 +535,6 @@ impl<'a> DataPartEncoder<'a> {
}
}

/// Data parts under a shard.
pub struct DataParts {
/// The active writing buffer.
pub(crate) active: DataBuffer,
/// immutable (encoded) parts.
pub(crate) frozen: Vec<DataPart>,
}

/// Format of immutable data part.
pub enum DataPart {
Parquet(ParquetPart),
Expand Down Expand Up @@ -607,9 +606,15 @@ impl DataPartReader {
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let rb = self.current_batch.as_ref().unwrap();
let pk_index = self.current_pk_index.unwrap();
let range = self.current_range.clone();
let rb = self
.current_batch
.as_ref()
.unwrap()
.slice(range.start, range.len());

let range = 0..rb.num_rows();
DataBatch {
pk_index,
rb,
Expand Down Expand Up @@ -654,6 +659,74 @@ pub struct ParquetPart {
data: Bytes,
}

/// Data parts under a shard.
pub struct DataParts {
/// The active writing buffer.
pub(crate) active: DataBuffer,
/// immutable (encoded) parts.
pub(crate) frozen: Vec<DataPart>,
}

impl DataParts {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self {
Self {
active: DataBuffer::with_capacity(metadata, capacity),
frozen: Vec::new(),
}
}

/// Writes one row into active part.
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) {
self.active.write_row(pk_id, kv)
}

/// Freezes the active data buffer into frozen data parts.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> {
self.frozen.push(self.active.freeze(pk_weights)?);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
pub fn read(&mut self, pk_weights: Vec<u16>) -> Result<DataPartsReader> {
let weights = Arc::new(pk_weights);
let mut nodes = Vec::with_capacity(self.frozen.len() + 1);
nodes.push(DataNode::new(
DataSource::Buffer(self.active.read(&weights)?),
weights.clone(),
));
for p in &self.frozen {
nodes.push(DataNode::new(DataSource::Part(p.read()?), weights.clone()));
}
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader { merger })
}

pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}
}

/// Reader for all parts inside a `DataParts`.
pub struct DataPartsReader {
merger: Merger<DataNode, DataBatch>,
}

impl DataPartsReader {
pub(crate) fn current_data_batch(&self) -> &DataBatch {
self.merger.current_item()
}

pub(crate) fn next(&mut self) -> Result<()> {
self.merger.next()
}

pub(crate) fn is_valid(&self) -> bool {
self.merger.is_valid()
}
}

#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
Expand Down
Loading

0 comments on commit 5ac6b35

Please sign in to comment.