diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 8a0a6031a0bf..be5db7c6a36e 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -16,6 +16,7 @@ mod data; mod dict; +mod merger; mod metrics; mod partition; mod shard; @@ -43,6 +44,7 @@ use crate::memtable::{ type ShardId = u32; /// Index of a primary key in a shard. type PkIndex = u16; + /// Id of a primary key inside a tree. #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct PkId { diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 20224b8af23c..5266d87a615c 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -42,34 +42,39 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger}; use crate::memtable::merge_tree::{PkId, PkIndex}; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; /// Data part batches returns by `DataParts::read`. #[derive(Debug, Clone)] -pub struct DataBatch<'a> { +pub struct DataBatch { /// Primary key index of this batch. - pk_index: PkIndex, + pub(crate) pk_index: PkIndex, /// Record batch of data. - rb: &'a RecordBatch, + pub(crate) rb: RecordBatch, /// Range of current primary key inside record batch - range: Range, + pub(crate) range: Range, } -impl<'a> DataBatch<'a> { +impl DataBatch { pub(crate) fn pk_index(&self) -> PkIndex { self.pk_index } pub(crate) fn record_batch(&self) -> &RecordBatch { - self.rb + &self.rb } pub(crate) fn range(&self) -> Range { self.range.clone() } + pub(crate) fn is_empty(&self) -> bool { + self.range.is_empty() + } + pub(crate) fn slice_record_batch(&self) -> RecordBatch { self.rb.slice(self.range.start, self.range.len()) } @@ -314,10 +319,12 @@ impl DataBufferReader { /// If Current reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { let (pk_index, range) = self.current_batch.as_ref().unwrap(); + let rb = self.batch.slice(range.start, range.len()); + let range = 0..rb.num_rows(); DataBatch { pk_index: *pk_index, - rb: &self.batch, - range: range.clone(), + rb, + range, } } @@ -528,14 +535,6 @@ impl<'a> DataPartEncoder<'a> { } } -/// Data parts under a shard. -pub struct DataParts { - /// The active writing buffer. - pub(crate) active: DataBuffer, - /// immutable (encoded) parts. - pub(crate) frozen: Vec, -} - /// Format of immutable data part. pub enum DataPart { Parquet(ParquetPart), @@ -607,9 +606,15 @@ impl DataPartReader { /// # Panics /// If reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { - let rb = self.current_batch.as_ref().unwrap(); let pk_index = self.current_pk_index.unwrap(); let range = self.current_range.clone(); + let rb = self + .current_batch + .as_ref() + .unwrap() + .slice(range.start, range.len()); + + let range = 0..rb.num_rows(); DataBatch { pk_index, rb, @@ -654,6 +659,74 @@ pub struct ParquetPart { data: Bytes, } +/// Data parts under a shard. +pub struct DataParts { + /// The active writing buffer. + pub(crate) active: DataBuffer, + /// immutable (encoded) parts. + pub(crate) frozen: Vec, +} + +impl DataParts { + pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self { + Self { + active: DataBuffer::with_capacity(metadata, capacity), + frozen: Vec::new(), + } + } + + /// Writes one row into active part. + pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) { + self.active.write_row(pk_id, kv) + } + + /// Freezes the active data buffer into frozen data parts. + pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> { + self.frozen.push(self.active.freeze(pk_weights)?); + Ok(()) + } + + /// Reads data from all parts including active and frozen parts. + /// The returned iterator yields a record batch of one primary key at a time. + /// The order of yielding primary keys is determined by provided weights. + pub fn read(&mut self, pk_weights: Vec) -> Result { + let weights = Arc::new(pk_weights); + let mut nodes = Vec::with_capacity(self.frozen.len() + 1); + nodes.push(DataNode::new( + DataSource::Buffer(self.active.read(&weights)?), + weights.clone(), + )); + for p in &self.frozen { + nodes.push(DataNode::new(DataSource::Part(p.read()?), weights.clone())); + } + let merger = Merger::try_new(nodes)?; + Ok(DataPartsReader { merger }) + } + + pub(crate) fn is_empty(&self) -> bool { + self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) + } +} + +/// Reader for all parts inside a `DataParts`. +pub struct DataPartsReader { + merger: Merger, +} + +impl DataPartsReader { + pub(crate) fn current_data_batch(&self) -> &DataBatch { + self.merger.current_item() + } + + pub(crate) fn next(&mut self) -> Result<()> { + self.merger.next() + } + + pub(crate) fn is_valid(&self) -> bool { + self.merger.is_valid() + } +} + #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs new file mode 100644 index 000000000000..611c7a79203d --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -0,0 +1,607 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::ops::Range; +use std::sync::Arc; + +use datatypes::arrow::array::{ + ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use datatypes::arrow::datatypes::{DataType, TimeUnit}; + +use crate::error::Result; +use crate::memtable::merge_tree::data::{DataBatch, DataBufferReader, DataPartReader}; +use crate::memtable::merge_tree::PkIndex; + +pub trait Item: Clone { + type Key: Ord; + + /// Remaining rows in item. + fn remaining(&self) -> usize; + + /// Whether current item is exhausted. + fn is_empty(&self) -> bool; + + /// The key range of item. + fn current_range(&self) -> Range; + + /// Searches given key in current item. + fn search_key(&self, key: &Self::Key) -> std::result::Result; + + /// Slice item. + fn slice(&self, range: Range) -> Self; +} + +pub trait Node: Ord { + type Item: Item; + + /// Returns current item of node and fetch next. + fn fetch_next(&mut self) -> Result; + + /// Returns true if current node is not exhausted. + fn is_valid(&self) -> bool; + + /// Current item of node. + fn current_item(&self) -> &Self::Item; + + /// Whether the other node is behind (exclusive) current node. + fn is_behind(&self, other: &Self) -> bool; + + /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches + /// next batch from the node. + /// + /// # Panics + /// If the node is EOF. + fn skip(&mut self, offset_to_skip: usize) -> Result<()>; +} + +pub struct Merger { + heap: BinaryHeap, + current_item: Option, +} + +impl Merger + where + I: Item, + T: Node, +{ + pub(crate) fn try_new(nodes: Vec) -> Result { + let mut heap = BinaryHeap::with_capacity(nodes.len()); + for node in nodes { + if node.is_valid() { + heap.push(node); + } + } + + let mut merger = Merger { + heap, + current_item: None, + }; + merger.next()?; + Ok(merger) + } + + /// Returns true if current merger is still valid. + pub(crate) fn is_valid(&self) -> bool { + self.current_item.is_some() + } + + /// Advances current merger to next item. + pub(crate) fn next(&mut self) -> Result<()> { + if let Some(mut top_node) = self.heap.pop() { + if let Some(next_node) = self.heap.peek() { + if next_node.is_behind(&top_node) { + // does not overlap + self.current_item = Some(top_node.fetch_next()?); + } else { + let next_start = next_node.current_item().current_range().start; + let res = match top_node.current_item().search_key(&next_start) { + Ok(pos) => { + // duplicate timestamp found, yield duplicated row in this item + let to_yield = top_node.current_item().slice(0..pos + 1); + top_node.skip(pos + 1).unwrap(); + to_yield + } + Err(pos) => { + // no duplicated timestamp + let to_yield = top_node.current_item().slice(0..pos); + top_node.skip(pos).unwrap(); + to_yield + } + }; + self.current_item = Some(res); + if top_node.is_valid() { + self.heap.push(top_node); + } + } + } else { + // top is the only node left. + self.current_item = Some(top_node.fetch_next()?); + if top_node.is_valid() { + self.heap.push(top_node); + } + } + } else { + // heap is empty + self.current_item = None; + } + Ok(()) + } + + /// Returns current item held by merger. + pub(crate) fn current_item(&self) -> &I { + self.current_item.as_ref().unwrap() + } +} + +#[derive(Debug)] +pub struct DataBatchKey { + pk_index: PkIndex, + timestamp: i64, +} + +impl Eq for DataBatchKey {} + +impl PartialEq for DataBatchKey { + fn eq(&self, other: &Self) -> bool { + self.pk_index == other.pk_index && self.timestamp == other.timestamp + } +} + +impl PartialOrd for DataBatchKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DataBatchKey { + fn cmp(&self, other: &Self) -> Ordering { + self.pk_index + .cmp(&other.pk_index) + .then(self.timestamp.cmp(&other.timestamp)) + .reverse() + } +} + +impl DataBatch { + fn current_ts_range(&self) -> (i64, i64) { + let range = self.range(); + let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); + (ts_values[range.start], ts_values[range.end - 1]) + } +} + +impl Item for DataBatch { + type Key = DataBatchKey; + + fn remaining(&self) -> usize { + self.range().len() + } + + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn current_range(&self) -> Range { + let range = self.range(); + let batch = self.record_batch(); + let pk_index = self.pk_index(); + let ts_array = batch.column(1); + + // maybe safe the result somewhere. + let ts_values = timestamp_array_to_i64_slice(ts_array); + let (ts_start, ts_end) = (ts_values[range.start], ts_values[range.end - 1]); + DataBatchKey { + pk_index, + timestamp: ts_start, + }..DataBatchKey { + pk_index, + timestamp: ts_end, + } + } + + fn search_key(&self, key: &Self::Key) -> std::result::Result { + let DataBatchKey { + pk_index, + timestamp, + } = key; + if *pk_index != self.pk_index() { + return Err(self.range().end); + } + let ts_values = timestamp_array_to_i64_slice(self.record_batch().column(1)); + ts_values.binary_search(timestamp) + } + + fn slice(&self, range: Range) -> Self { + let rb = self.rb.slice(range.start, range.len()); + let range = 0..rb.num_rows(); + Self { + pk_index: self.pk_index, + rb, + range, + } + } +} + +pub struct DataNode { + pk_weights: Arc>, + source: DataSource, + current_data_batch: Option, +} + +impl DataNode { + pub(crate) fn new(source: DataSource, weight: Arc>) -> Self { + let current_data_batch = source.current_data_batch(); + Self { + pk_weights: weight, + source, + current_data_batch: Some(current_data_batch), + } + } + + fn next(&mut self) -> Result<()> { + let next = match &mut self.source { + DataSource::Buffer(b) => { + b.next()?; + if b.is_valid() { + Some(b.current_data_batch()) + } else { + None + } + } + DataSource::Part(p) => { + p.next()?; + if p.is_valid() { + Some(p.current_data_batch()) + } else { + None + } + } + }; + self.current_data_batch = next; + Ok(()) + } + + fn current_data_batch(&self) -> &DataBatch { + self.current_data_batch.as_ref().unwrap() + } +} + +pub enum DataSource { + Buffer(DataBufferReader), + Part(DataPartReader), +} + +impl DataSource { + pub(crate) fn current_data_batch(&self) -> DataBatch { + match self { + DataSource::Buffer(buffer) => buffer.current_data_batch(), + DataSource::Part(p) => p.current_data_batch(), + } + } +} + +impl Ord for DataNode { + fn cmp(&self, other: &Self) -> Ordering { + self.current_data_batch() + .current_range() + .start + .cmp(&other.current_data_batch().current_range().start) + } +} + +impl Eq for DataNode {} + +impl PartialEq for DataNode { + fn eq(&self, other: &Self) -> bool { + self.current_data_batch() + .current_range() + .start + .eq(&other.current_data_batch().current_range().start) + } +} + +#[allow(clippy::non_canonical_partial_ord_impl)] +impl PartialOrd for DataNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some( + ( + self.pk_weights[self.current_data_batch().pk_index as usize], + self.current_data_batch().current_ts_range().0, + ) + .cmp(&( + self.pk_weights[other.current_data_batch().pk_index as usize], + other.current_data_batch().current_ts_range().0, + )) + .reverse(), + ) + } +} + +impl Node for DataNode { + type Item = DataBatch; + + fn fetch_next(&mut self) -> Result { + let current = self.current_data_batch.take(); + Ok(current.unwrap()) + } + + fn is_valid(&self) -> bool { + self.current_data_batch.is_some() + } + + fn current_item(&self) -> &Self::Item { + self.current_data_batch() + } + + fn is_behind(&self, other: &Self) -> bool { + let pk_weight = self.pk_weights[self.current_data_batch().pk_index as usize]; + let start = self.current_data_batch().current_ts_range().0; + + let other_pk_weight = self.pk_weights[other.current_data_batch().pk_index as usize]; + let other_end = other.current_data_batch().current_ts_range().1; + (pk_weight, start) > (other_pk_weight, other_end) + } + + fn skip(&mut self, offset_to_skip: usize) -> Result<()> { + let current = self.current_item(); + let remaining = current.remaining() - offset_to_skip; + if remaining == 0 { + self.next()?; + } else { + let end = current.remaining(); + self.current_data_batch = Some(current.slice(offset_to_skip..end)); + } + + Ok(()) + } +} + +fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { + match arr.data_type() { + DataType::Timestamp(t, _) => match t { + TimeUnit::Second => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Millisecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Microsecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Nanosecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + }, + _ => unreachable!(), + } +} + +#[cfg(test)] +mod tests { + use datatypes::arrow::array::UInt64Array; + use store_api::metadata::RegionMetadataRef; + + use super::*; + use crate::memtable::merge_tree::data::DataBuffer; + use crate::memtable::merge_tree::PkId; + use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + + fn write_rows_to_buffer( + buffer: &mut DataBuffer, + schema: &RegionMetadataRef, + pk_index: u16, + ts: Vec, + sequence: &mut u64, + ) { + let rows = ts.len() as u64; + let v0 = ts.iter().map(|v| Some(*v as f64)).collect::>(); + let kvs = build_key_values_with_ts_seq_values( + schema, + "whatever".to_string(), + 1, + ts.into_iter(), + v0.into_iter(), + *sequence, + ); + + for kv in kvs.iter() { + buffer.write_row( + PkId { + shard_id: 0, + pk_index, + }, + kv, + ); + } + + *sequence += rows; + } + + fn check_merger_read(nodes: Vec, expected: &[(u16, Vec<(i64, u64)>)]) { + let mut merger = Merger::try_new(nodes).unwrap(); + + let mut res = vec![]; + while merger.is_valid() { + let data_batch = merger.current_item(); + let batch = data_batch.slice_record_batch(); + let ts_array = batch.column(1); + let ts_values: Vec<_> = match ts_array.data_type() { + DataType::Timestamp(t, _) => match t { + TimeUnit::Second => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + TimeUnit::Millisecond => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + TimeUnit::Microsecond => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + TimeUnit::Nanosecond => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + }, + _ => unreachable!(), + }; + + let ts_and_seq = ts_values + .into_iter() + .zip( + batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .iter(), + ) + .map(|(ts, seq)| (ts, seq.unwrap())) + .collect::>(); + + res.push((data_batch.pk_index, ts_and_seq)); + merger.next().unwrap(); + } + assert_eq!(expected, &res); + } + + #[test] + fn test_merger() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![2, 1, 0]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); + let node2 = DataNode::new( + DataSource::Buffer(buffer2.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(&weight).unwrap()), weight); + + check_merger_read( + vec![node1, node3, node2], + &[ + (2, vec![(1, 0), (2, 1)]), + (2, vec![(2, 4), (3, 5)]), + (1, vec![(2, 2), (3, 3)]), + ], + ); + } + + #[test] + fn test_merger_overlapping() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![0, 1, 2]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); + let node2 = DataNode::new( + DataSource::Buffer(buffer2.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(&weight).unwrap()), weight); + + check_merger_read( + vec![node1, node3, node2], + &[ + (0, vec![(1, 0), (2, 1)]), + (0, vec![(2, 5), (3, 6)]), + (0, vec![(3, 2)]), + (1, vec![(2, 3), (3, 4)]), + ], + ); + } + + #[test] + fn test_merger_parts_and_buffer() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![0, 1, 2]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); + let node2 = DataNode::new( + DataSource::Part(buffer2.freeze(&weight).unwrap().read().unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); + let node3 = DataNode::new( + DataSource::Part(buffer3.freeze(&weight).unwrap().read().unwrap()), + weight.clone(), + ); + + check_merger_read( + vec![node1, node3, node2], + &[ + (0, vec![(1, 0), (2, 1)]), + (0, vec![(2, 5), (3, 6)]), + (0, vec![(3, 2)]), + (1, vec![(2, 3), (3, 4)]), + ], + ); + } +}