From 5ac6b354e5a2b4d0ed28d82800479eae0422da45 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 22 Feb 2024 17:46:20 +0800 Subject: [PATCH 1/9] feat: impl merge reader for DataParts --- src/mito2/src/memtable/merge_tree.rs | 2 + src/mito2/src/memtable/merge_tree/data.rs | 107 +++- src/mito2/src/memtable/merge_tree/merger.rs | 607 ++++++++++++++++++++ 3 files changed, 699 insertions(+), 17 deletions(-) create mode 100644 src/mito2/src/memtable/merge_tree/merger.rs diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 8a0a6031a0bf..be5db7c6a36e 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -16,6 +16,7 @@ mod data; mod dict; +mod merger; mod metrics; mod partition; mod shard; @@ -43,6 +44,7 @@ use crate::memtable::{ type ShardId = u32; /// Index of a primary key in a shard. type PkIndex = u16; + /// Id of a primary key inside a tree. #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct PkId { diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 20224b8af23c..5266d87a615c 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -42,34 +42,39 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger}; use crate::memtable::merge_tree::{PkId, PkIndex}; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; /// Data part batches returns by `DataParts::read`. #[derive(Debug, Clone)] -pub struct DataBatch<'a> { +pub struct DataBatch { /// Primary key index of this batch. - pk_index: PkIndex, + pub(crate) pk_index: PkIndex, /// Record batch of data. - rb: &'a RecordBatch, + pub(crate) rb: RecordBatch, /// Range of current primary key inside record batch - range: Range, + pub(crate) range: Range, } -impl<'a> DataBatch<'a> { +impl DataBatch { pub(crate) fn pk_index(&self) -> PkIndex { self.pk_index } pub(crate) fn record_batch(&self) -> &RecordBatch { - self.rb + &self.rb } pub(crate) fn range(&self) -> Range { self.range.clone() } + pub(crate) fn is_empty(&self) -> bool { + self.range.is_empty() + } + pub(crate) fn slice_record_batch(&self) -> RecordBatch { self.rb.slice(self.range.start, self.range.len()) } @@ -314,10 +319,12 @@ impl DataBufferReader { /// If Current reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { let (pk_index, range) = self.current_batch.as_ref().unwrap(); + let rb = self.batch.slice(range.start, range.len()); + let range = 0..rb.num_rows(); DataBatch { pk_index: *pk_index, - rb: &self.batch, - range: range.clone(), + rb, + range, } } @@ -528,14 +535,6 @@ impl<'a> DataPartEncoder<'a> { } } -/// Data parts under a shard. -pub struct DataParts { - /// The active writing buffer. - pub(crate) active: DataBuffer, - /// immutable (encoded) parts. - pub(crate) frozen: Vec, -} - /// Format of immutable data part. pub enum DataPart { Parquet(ParquetPart), @@ -607,9 +606,15 @@ impl DataPartReader { /// # Panics /// If reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { - let rb = self.current_batch.as_ref().unwrap(); let pk_index = self.current_pk_index.unwrap(); let range = self.current_range.clone(); + let rb = self + .current_batch + .as_ref() + .unwrap() + .slice(range.start, range.len()); + + let range = 0..rb.num_rows(); DataBatch { pk_index, rb, @@ -654,6 +659,74 @@ pub struct ParquetPart { data: Bytes, } +/// Data parts under a shard. +pub struct DataParts { + /// The active writing buffer. + pub(crate) active: DataBuffer, + /// immutable (encoded) parts. + pub(crate) frozen: Vec, +} + +impl DataParts { + pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self { + Self { + active: DataBuffer::with_capacity(metadata, capacity), + frozen: Vec::new(), + } + } + + /// Writes one row into active part. + pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) { + self.active.write_row(pk_id, kv) + } + + /// Freezes the active data buffer into frozen data parts. + pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> { + self.frozen.push(self.active.freeze(pk_weights)?); + Ok(()) + } + + /// Reads data from all parts including active and frozen parts. + /// The returned iterator yields a record batch of one primary key at a time. + /// The order of yielding primary keys is determined by provided weights. + pub fn read(&mut self, pk_weights: Vec) -> Result { + let weights = Arc::new(pk_weights); + let mut nodes = Vec::with_capacity(self.frozen.len() + 1); + nodes.push(DataNode::new( + DataSource::Buffer(self.active.read(&weights)?), + weights.clone(), + )); + for p in &self.frozen { + nodes.push(DataNode::new(DataSource::Part(p.read()?), weights.clone())); + } + let merger = Merger::try_new(nodes)?; + Ok(DataPartsReader { merger }) + } + + pub(crate) fn is_empty(&self) -> bool { + self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) + } +} + +/// Reader for all parts inside a `DataParts`. +pub struct DataPartsReader { + merger: Merger, +} + +impl DataPartsReader { + pub(crate) fn current_data_batch(&self) -> &DataBatch { + self.merger.current_item() + } + + pub(crate) fn next(&mut self) -> Result<()> { + self.merger.next() + } + + pub(crate) fn is_valid(&self) -> bool { + self.merger.is_valid() + } +} + #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs new file mode 100644 index 000000000000..611c7a79203d --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -0,0 +1,607 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::ops::Range; +use std::sync::Arc; + +use datatypes::arrow::array::{ + ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use datatypes::arrow::datatypes::{DataType, TimeUnit}; + +use crate::error::Result; +use crate::memtable::merge_tree::data::{DataBatch, DataBufferReader, DataPartReader}; +use crate::memtable::merge_tree::PkIndex; + +pub trait Item: Clone { + type Key: Ord; + + /// Remaining rows in item. + fn remaining(&self) -> usize; + + /// Whether current item is exhausted. + fn is_empty(&self) -> bool; + + /// The key range of item. + fn current_range(&self) -> Range; + + /// Searches given key in current item. + fn search_key(&self, key: &Self::Key) -> std::result::Result; + + /// Slice item. + fn slice(&self, range: Range) -> Self; +} + +pub trait Node: Ord { + type Item: Item; + + /// Returns current item of node and fetch next. + fn fetch_next(&mut self) -> Result; + + /// Returns true if current node is not exhausted. + fn is_valid(&self) -> bool; + + /// Current item of node. + fn current_item(&self) -> &Self::Item; + + /// Whether the other node is behind (exclusive) current node. + fn is_behind(&self, other: &Self) -> bool; + + /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches + /// next batch from the node. + /// + /// # Panics + /// If the node is EOF. + fn skip(&mut self, offset_to_skip: usize) -> Result<()>; +} + +pub struct Merger { + heap: BinaryHeap, + current_item: Option, +} + +impl Merger + where + I: Item, + T: Node, +{ + pub(crate) fn try_new(nodes: Vec) -> Result { + let mut heap = BinaryHeap::with_capacity(nodes.len()); + for node in nodes { + if node.is_valid() { + heap.push(node); + } + } + + let mut merger = Merger { + heap, + current_item: None, + }; + merger.next()?; + Ok(merger) + } + + /// Returns true if current merger is still valid. + pub(crate) fn is_valid(&self) -> bool { + self.current_item.is_some() + } + + /// Advances current merger to next item. + pub(crate) fn next(&mut self) -> Result<()> { + if let Some(mut top_node) = self.heap.pop() { + if let Some(next_node) = self.heap.peek() { + if next_node.is_behind(&top_node) { + // does not overlap + self.current_item = Some(top_node.fetch_next()?); + } else { + let next_start = next_node.current_item().current_range().start; + let res = match top_node.current_item().search_key(&next_start) { + Ok(pos) => { + // duplicate timestamp found, yield duplicated row in this item + let to_yield = top_node.current_item().slice(0..pos + 1); + top_node.skip(pos + 1).unwrap(); + to_yield + } + Err(pos) => { + // no duplicated timestamp + let to_yield = top_node.current_item().slice(0..pos); + top_node.skip(pos).unwrap(); + to_yield + } + }; + self.current_item = Some(res); + if top_node.is_valid() { + self.heap.push(top_node); + } + } + } else { + // top is the only node left. + self.current_item = Some(top_node.fetch_next()?); + if top_node.is_valid() { + self.heap.push(top_node); + } + } + } else { + // heap is empty + self.current_item = None; + } + Ok(()) + } + + /// Returns current item held by merger. + pub(crate) fn current_item(&self) -> &I { + self.current_item.as_ref().unwrap() + } +} + +#[derive(Debug)] +pub struct DataBatchKey { + pk_index: PkIndex, + timestamp: i64, +} + +impl Eq for DataBatchKey {} + +impl PartialEq for DataBatchKey { + fn eq(&self, other: &Self) -> bool { + self.pk_index == other.pk_index && self.timestamp == other.timestamp + } +} + +impl PartialOrd for DataBatchKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DataBatchKey { + fn cmp(&self, other: &Self) -> Ordering { + self.pk_index + .cmp(&other.pk_index) + .then(self.timestamp.cmp(&other.timestamp)) + .reverse() + } +} + +impl DataBatch { + fn current_ts_range(&self) -> (i64, i64) { + let range = self.range(); + let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); + (ts_values[range.start], ts_values[range.end - 1]) + } +} + +impl Item for DataBatch { + type Key = DataBatchKey; + + fn remaining(&self) -> usize { + self.range().len() + } + + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn current_range(&self) -> Range { + let range = self.range(); + let batch = self.record_batch(); + let pk_index = self.pk_index(); + let ts_array = batch.column(1); + + // maybe safe the result somewhere. + let ts_values = timestamp_array_to_i64_slice(ts_array); + let (ts_start, ts_end) = (ts_values[range.start], ts_values[range.end - 1]); + DataBatchKey { + pk_index, + timestamp: ts_start, + }..DataBatchKey { + pk_index, + timestamp: ts_end, + } + } + + fn search_key(&self, key: &Self::Key) -> std::result::Result { + let DataBatchKey { + pk_index, + timestamp, + } = key; + if *pk_index != self.pk_index() { + return Err(self.range().end); + } + let ts_values = timestamp_array_to_i64_slice(self.record_batch().column(1)); + ts_values.binary_search(timestamp) + } + + fn slice(&self, range: Range) -> Self { + let rb = self.rb.slice(range.start, range.len()); + let range = 0..rb.num_rows(); + Self { + pk_index: self.pk_index, + rb, + range, + } + } +} + +pub struct DataNode { + pk_weights: Arc>, + source: DataSource, + current_data_batch: Option, +} + +impl DataNode { + pub(crate) fn new(source: DataSource, weight: Arc>) -> Self { + let current_data_batch = source.current_data_batch(); + Self { + pk_weights: weight, + source, + current_data_batch: Some(current_data_batch), + } + } + + fn next(&mut self) -> Result<()> { + let next = match &mut self.source { + DataSource::Buffer(b) => { + b.next()?; + if b.is_valid() { + Some(b.current_data_batch()) + } else { + None + } + } + DataSource::Part(p) => { + p.next()?; + if p.is_valid() { + Some(p.current_data_batch()) + } else { + None + } + } + }; + self.current_data_batch = next; + Ok(()) + } + + fn current_data_batch(&self) -> &DataBatch { + self.current_data_batch.as_ref().unwrap() + } +} + +pub enum DataSource { + Buffer(DataBufferReader), + Part(DataPartReader), +} + +impl DataSource { + pub(crate) fn current_data_batch(&self) -> DataBatch { + match self { + DataSource::Buffer(buffer) => buffer.current_data_batch(), + DataSource::Part(p) => p.current_data_batch(), + } + } +} + +impl Ord for DataNode { + fn cmp(&self, other: &Self) -> Ordering { + self.current_data_batch() + .current_range() + .start + .cmp(&other.current_data_batch().current_range().start) + } +} + +impl Eq for DataNode {} + +impl PartialEq for DataNode { + fn eq(&self, other: &Self) -> bool { + self.current_data_batch() + .current_range() + .start + .eq(&other.current_data_batch().current_range().start) + } +} + +#[allow(clippy::non_canonical_partial_ord_impl)] +impl PartialOrd for DataNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some( + ( + self.pk_weights[self.current_data_batch().pk_index as usize], + self.current_data_batch().current_ts_range().0, + ) + .cmp(&( + self.pk_weights[other.current_data_batch().pk_index as usize], + other.current_data_batch().current_ts_range().0, + )) + .reverse(), + ) + } +} + +impl Node for DataNode { + type Item = DataBatch; + + fn fetch_next(&mut self) -> Result { + let current = self.current_data_batch.take(); + Ok(current.unwrap()) + } + + fn is_valid(&self) -> bool { + self.current_data_batch.is_some() + } + + fn current_item(&self) -> &Self::Item { + self.current_data_batch() + } + + fn is_behind(&self, other: &Self) -> bool { + let pk_weight = self.pk_weights[self.current_data_batch().pk_index as usize]; + let start = self.current_data_batch().current_ts_range().0; + + let other_pk_weight = self.pk_weights[other.current_data_batch().pk_index as usize]; + let other_end = other.current_data_batch().current_ts_range().1; + (pk_weight, start) > (other_pk_weight, other_end) + } + + fn skip(&mut self, offset_to_skip: usize) -> Result<()> { + let current = self.current_item(); + let remaining = current.remaining() - offset_to_skip; + if remaining == 0 { + self.next()?; + } else { + let end = current.remaining(); + self.current_data_batch = Some(current.slice(offset_to_skip..end)); + } + + Ok(()) + } +} + +fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { + match arr.data_type() { + DataType::Timestamp(t, _) => match t { + TimeUnit::Second => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Millisecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Microsecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Nanosecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + }, + _ => unreachable!(), + } +} + +#[cfg(test)] +mod tests { + use datatypes::arrow::array::UInt64Array; + use store_api::metadata::RegionMetadataRef; + + use super::*; + use crate::memtable::merge_tree::data::DataBuffer; + use crate::memtable::merge_tree::PkId; + use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + + fn write_rows_to_buffer( + buffer: &mut DataBuffer, + schema: &RegionMetadataRef, + pk_index: u16, + ts: Vec, + sequence: &mut u64, + ) { + let rows = ts.len() as u64; + let v0 = ts.iter().map(|v| Some(*v as f64)).collect::>(); + let kvs = build_key_values_with_ts_seq_values( + schema, + "whatever".to_string(), + 1, + ts.into_iter(), + v0.into_iter(), + *sequence, + ); + + for kv in kvs.iter() { + buffer.write_row( + PkId { + shard_id: 0, + pk_index, + }, + kv, + ); + } + + *sequence += rows; + } + + fn check_merger_read(nodes: Vec, expected: &[(u16, Vec<(i64, u64)>)]) { + let mut merger = Merger::try_new(nodes).unwrap(); + + let mut res = vec![]; + while merger.is_valid() { + let data_batch = merger.current_item(); + let batch = data_batch.slice_record_batch(); + let ts_array = batch.column(1); + let ts_values: Vec<_> = match ts_array.data_type() { + DataType::Timestamp(t, _) => match t { + TimeUnit::Second => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + TimeUnit::Millisecond => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + TimeUnit::Microsecond => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + TimeUnit::Nanosecond => ts_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + }, + _ => unreachable!(), + }; + + let ts_and_seq = ts_values + .into_iter() + .zip( + batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .iter(), + ) + .map(|(ts, seq)| (ts, seq.unwrap())) + .collect::>(); + + res.push((data_batch.pk_index, ts_and_seq)); + merger.next().unwrap(); + } + assert_eq!(expected, &res); + } + + #[test] + fn test_merger() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![2, 1, 0]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); + let node2 = DataNode::new( + DataSource::Buffer(buffer2.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(&weight).unwrap()), weight); + + check_merger_read( + vec![node1, node3, node2], + &[ + (2, vec![(1, 0), (2, 1)]), + (2, vec![(2, 4), (3, 5)]), + (1, vec![(2, 2), (3, 3)]), + ], + ); + } + + #[test] + fn test_merger_overlapping() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![0, 1, 2]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); + let node2 = DataNode::new( + DataSource::Buffer(buffer2.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(&weight).unwrap()), weight); + + check_merger_read( + vec![node1, node3, node2], + &[ + (0, vec![(1, 0), (2, 1)]), + (0, vec![(2, 5), (3, 6)]), + (0, vec![(3, 2)]), + (1, vec![(2, 3), (3, 4)]), + ], + ); + } + + #[test] + fn test_merger_parts_and_buffer() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![0, 1, 2]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); + let node2 = DataNode::new( + DataSource::Part(buffer2.freeze(&weight).unwrap().read().unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); + let node3 = DataNode::new( + DataSource::Part(buffer3.freeze(&weight).unwrap().read().unwrap()), + weight.clone(), + ); + + check_merger_read( + vec![node1, node3, node2], + &[ + (0, vec![(1, 0), (2, 1)]), + (0, vec![(2, 5), (3, 6)]), + (0, vec![(3, 2)]), + (1, vec![(2, 3), (3, 4)]), + ], + ); + } +} From 18e8128a5433043daadbf058f9a77905cb2257c5 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 22 Feb 2024 19:17:51 +0800 Subject: [PATCH 2/9] fix: fmt --- src/mito2/src/memtable/merge_tree/merger.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 611c7a79203d..1edb1a558df5 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -75,9 +75,9 @@ pub struct Merger { } impl Merger - where - I: Item, - T: Node, +where + I: Item, + T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { let mut heap = BinaryHeap::with_capacity(nodes.len()); From 780e824534060c99dcbee8f769981dc75215360d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 22 Feb 2024 20:20:59 +0800 Subject: [PATCH 3/9] fix: sort rows with pk and ts according to sequnce desc --- src/mito2/src/memtable/merge_tree/merger.rs | 160 ++++++++++++-------- 1 file changed, 95 insertions(+), 65 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 1edb1a558df5..f8095eadc6c6 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; +use std::cmp::{Ordering, Reverse}; use std::collections::BinaryHeap; use std::ops::Range; use std::sync::Arc; use datatypes::arrow::array::{ ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + TimestampSecondArray, UInt64Array, }; use datatypes::arrow::datatypes::{DataType, TimeUnit}; @@ -75,9 +75,9 @@ pub struct Merger { } impl Merger -where - I: Item, - T: Node, + where + I: Item, + T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { let mut heap = BinaryHeap::with_capacity(nodes.len()); @@ -112,14 +112,20 @@ where let res = match top_node.current_item().search_key(&next_start) { Ok(pos) => { // duplicate timestamp found, yield duplicated row in this item - let to_yield = top_node.current_item().slice(0..pos + 1); - top_node.skip(pos + 1).unwrap(); - to_yield + if pos == 0 { + let to_yield = top_node.current_item().slice(0..1); + top_node.skip(1)?; + to_yield + } else { + let to_yield = top_node.current_item().slice(0..pos); + top_node.skip(pos)?; + to_yield + } } Err(pos) => { // no duplicated timestamp let to_yield = top_node.current_item().slice(0..pos); - top_node.skip(pos).unwrap(); + top_node.skip(pos)?; to_yield } }; @@ -178,10 +184,30 @@ impl Ord for DataBatchKey { } impl DataBatch { - fn current_ts_range(&self) -> (i64, i64) { + fn first_row(&self) -> (i64, u64) { + let range = self.range(); + let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); + let sequence_values = self + .rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values(); + (ts_values[range.start], sequence_values[range.start]) + } + + fn last_row(&self) -> (i64, u64) { let range = self.range(); let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); - (ts_values[range.start], ts_values[range.end - 1]) + let sequence_values = self + .rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values(); + (ts_values[range.end - 1], sequence_values[range.end - 1]) } } @@ -309,24 +335,22 @@ impl Eq for DataNode {} impl PartialEq for DataNode { fn eq(&self, other: &Self) -> bool { self.current_data_batch() - .current_range() - .start - .eq(&other.current_data_batch().current_range().start) + .first_row() + .eq(&other.current_data_batch().first_row()) } } #[allow(clippy::non_canonical_partial_ord_impl)] impl PartialOrd for DataNode { fn partial_cmp(&self, other: &Self) -> Option { + let weight = self.pk_weights[self.current_data_batch().pk_index as usize]; + let (ts_start, sequence) = self.current_data_batch().first_row(); + let other_weight = self.pk_weights[other.current_data_batch().pk_index as usize]; + let (other_ts_start, other_sequence) = other.current_data_batch().first_row(); + Some( - ( - self.pk_weights[self.current_data_batch().pk_index as usize], - self.current_data_batch().current_ts_range().0, - ) - .cmp(&( - self.pk_weights[other.current_data_batch().pk_index as usize], - other.current_data_batch().current_ts_range().0, - )) + (weight, ts_start, Reverse(sequence)) + .cmp(&(other_weight, other_ts_start, Reverse(other_sequence))) .reverse(), ) } @@ -350,10 +374,9 @@ impl Node for DataNode { fn is_behind(&self, other: &Self) -> bool { let pk_weight = self.pk_weights[self.current_data_batch().pk_index as usize]; - let start = self.current_data_batch().current_ts_range().0; - + let (start, _) = self.current_data_batch().first_row(); let other_pk_weight = self.pk_weights[other.current_data_batch().pk_index as usize]; - let other_end = other.current_data_batch().current_ts_range().1; + let (other_end, _) = other.current_data_batch().last_row(); (pk_weight, start) > (other_pk_weight, other_end) } @@ -448,40 +471,7 @@ mod tests { let data_batch = merger.current_item(); let batch = data_batch.slice_record_batch(); let ts_array = batch.column(1); - let ts_values: Vec<_> = match ts_array.data_type() { - DataType::Timestamp(t, _) => match t { - TimeUnit::Second => ts_array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect(), - TimeUnit::Millisecond => ts_array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect(), - TimeUnit::Microsecond => ts_array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect(), - TimeUnit::Nanosecond => ts_array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect(), - }, - _ => unreachable!(), - }; - + let ts_values: Vec<_> = timestamp_array_to_i64_slice(ts_array).to_vec(); let ts_and_seq = ts_values .into_iter() .zip( @@ -527,8 +517,10 @@ mod tests { check_merger_read( vec![node1, node3, node2], &[ - (2, vec![(1, 0), (2, 1)]), - (2, vec![(2, 4), (3, 5)]), + (2, vec![(1, 0)]), + (2, vec![(2, 4)]), + (2, vec![(2, 1)]), + (2, vec![(3, 5)]), (1, vec![(2, 2), (3, 3)]), ], ); @@ -560,8 +552,10 @@ mod tests { check_merger_read( vec![node1, node3, node2], &[ - (0, vec![(1, 0), (2, 1)]), - (0, vec![(2, 5), (3, 6)]), + (0, vec![(1, 0)]), + (0, vec![(2, 5)]), + (0, vec![(2, 1)]), + (0, vec![(3, 6)]), (0, vec![(3, 2)]), (1, vec![(2, 3), (3, 4)]), ], @@ -597,11 +591,47 @@ mod tests { check_merger_read( vec![node1, node3, node2], &[ - (0, vec![(1, 0), (2, 1)]), - (0, vec![(2, 5), (3, 6)]), + (0, vec![(1, 0)]), + (0, vec![(2, 5)]), + (0, vec![(2, 1)]), + (0, vec![(3, 6)]), (0, vec![(3, 2)]), (1, vec![(2, 3), (3, 4)]), ], ); } + + #[test] + fn test_merger_overlapping_2() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = Arc::new(vec![0, 1, 2]); + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq); + let node1 = DataNode::new( + DataSource::Buffer(buffer1.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq); + let node3 = DataNode::new( + DataSource::Buffer(buffer3.read(&weight).unwrap()), + weight.clone(), + ); + + let mut buffer4 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer4, &metadata, 0, vec![2], &mut seq); + let node4 = DataNode::new(DataSource::Buffer(buffer4.read(&weight).unwrap()), weight); + + check_merger_read( + vec![node1, node3, node4], + &[ + (0, vec![(1, 0)]), + (0, vec![(2, 4)]), + (0, vec![(2, 3)]), + (0, vec![(2, 2)]), + ], + ); + } } From 171779a4d2a84d4858760f53565fd7915f91279f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 22 Feb 2024 21:38:39 +0800 Subject: [PATCH 4/9] fix: remove pk weight as pk index are already replace by weights --- src/mito2/src/memtable/merge_tree/data.rs | 9 +- src/mito2/src/memtable/merge_tree/merger.rs | 154 ++++++++++---------- 2 files changed, 79 insertions(+), 84 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 5266d87a615c..03202f325d9e 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -692,12 +692,11 @@ impl DataParts { pub fn read(&mut self, pk_weights: Vec) -> Result { let weights = Arc::new(pk_weights); let mut nodes = Vec::with_capacity(self.frozen.len() + 1); - nodes.push(DataNode::new( - DataSource::Buffer(self.active.read(&weights)?), - weights.clone(), - )); + nodes.push(DataNode::new(DataSource::Buffer( + self.active.read(&weights)?, + ))); for p in &self.frozen { - nodes.push(DataNode::new(DataSource::Part(p.read()?), weights.clone())); + nodes.push(DataNode::new(DataSource::Part(p.read()?))); } let merger = Merger::try_new(nodes)?; Ok(DataPartsReader { merger }) diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index f8095eadc6c6..16525e6ccbff 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -14,8 +14,8 @@ use std::cmp::{Ordering, Reverse}; use std::collections::BinaryHeap; +use std::fmt::Debug; use std::ops::Range; -use std::sync::Arc; use datatypes::arrow::array::{ ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, @@ -76,7 +76,7 @@ pub struct Merger { impl Merger where - I: Item, + I: Item + Debug, T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { @@ -86,7 +86,6 @@ impl Merger heap.push(node); } } - let mut merger = Merger { heap, current_item: None, @@ -130,16 +129,13 @@ impl Merger } }; self.current_item = Some(res); - if top_node.is_valid() { - self.heap.push(top_node); - } } } else { // top is the only node left. self.current_item = Some(top_node.fetch_next()?); - if top_node.is_valid() { - self.heap.push(top_node); - } + } + if top_node.is_valid() { + self.heap.push(top_node); } } else { // heap is empty @@ -264,16 +260,14 @@ impl Item for DataBatch { } pub struct DataNode { - pk_weights: Arc>, source: DataSource, current_data_batch: Option, } impl DataNode { - pub(crate) fn new(source: DataSource, weight: Arc>) -> Self { + pub(crate) fn new(source: DataSource) -> Self { let current_data_batch = source.current_data_batch(); Self { - pk_weights: weight, source, current_data_batch: Some(current_data_batch), } @@ -323,10 +317,13 @@ impl DataSource { impl Ord for DataNode { fn cmp(&self, other: &Self) -> Ordering { - self.current_data_batch() - .current_range() - .start - .cmp(&other.current_data_batch().current_range().start) + let weight = self.current_data_batch().pk_index; + let (ts_start, sequence) = self.current_data_batch().first_row(); + let other_weight = other.current_data_batch().pk_index; + let (other_ts_start, other_sequence) = other.current_data_batch().first_row(); + (weight, ts_start, Reverse(sequence)) + .cmp(&(other_weight, other_ts_start, Reverse(other_sequence))) + .reverse() } } @@ -343,16 +340,7 @@ impl PartialEq for DataNode { #[allow(clippy::non_canonical_partial_ord_impl)] impl PartialOrd for DataNode { fn partial_cmp(&self, other: &Self) -> Option { - let weight = self.pk_weights[self.current_data_batch().pk_index as usize]; - let (ts_start, sequence) = self.current_data_batch().first_row(); - let other_weight = self.pk_weights[other.current_data_batch().pk_index as usize]; - let (other_ts_start, other_sequence) = other.current_data_batch().first_row(); - - Some( - (weight, ts_start, Reverse(sequence)) - .cmp(&(other_weight, other_ts_start, Reverse(other_sequence))) - .reverse(), - ) + Some(self.cmp(other)) } } @@ -361,6 +349,7 @@ impl Node for DataNode { fn fetch_next(&mut self) -> Result { let current = self.current_data_batch.take(); + self.next()?; Ok(current.unwrap()) } @@ -373,11 +362,11 @@ impl Node for DataNode { } fn is_behind(&self, other: &Self) -> bool { - let pk_weight = self.pk_weights[self.current_data_batch().pk_index as usize]; - let (start, _) = self.current_data_batch().first_row(); - let other_pk_weight = self.pk_weights[other.current_data_batch().pk_index as usize]; - let (other_end, _) = other.current_data_batch().last_row(); - (pk_weight, start) > (other_pk_weight, other_end) + let pk_weight = self.current_data_batch().pk_index; + let (start, seq) = self.current_data_batch().first_row(); + let other_pk_weight = other.current_data_batch().pk_index; + let (other_end, other_seq) = other.current_data_batch().last_row(); + (pk_weight, start, Reverse(seq)) > (other_pk_weight, other_end, Reverse(other_seq)) } fn skip(&mut self, offset_to_skip: usize) -> Result<()> { @@ -495,33 +484,57 @@ mod tests { fn test_merger() { let metadata = metadata_for_test(); let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); - let weight = Arc::new(vec![2, 1, 0]); + let weight = &[2, 1, 0]; let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq); write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq); - let node1 = DataNode::new( - DataSource::Buffer(buffer1.read(&weight).unwrap()), - weight.clone(), - ); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); - write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); - let node2 = DataNode::new( - DataSource::Buffer(buffer2.read(&weight).unwrap()), - weight.clone(), + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq); + write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq); + let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); + + check_merger_read( + vec![node1, node2], + &[ + (1, vec![(2, 0)]), + (1, vec![(3, 4)]), + (1, vec![(3, 1)]), + (2, vec![(1, 5)]), + (2, vec![(1, 2), (2, 3)]), + ], ); + } + + #[test] + fn test_merger2() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = &[2, 1, 0]; + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq); + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq); + let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); - let node3 = DataNode::new(DataSource::Buffer(buffer3.read(&weight).unwrap()), weight); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap())); check_merger_read( vec![node1, node3, node2], &[ - (2, vec![(1, 0)]), - (2, vec![(2, 4)]), - (2, vec![(2, 1)]), - (2, vec![(3, 5)]), - (1, vec![(2, 2), (3, 3)]), + (1, vec![(2, 0)]), + (1, vec![(3, 4)]), + (1, vec![(3, 1)]), + (2, vec![(1, 2)]), + (2, vec![(2, 5)]), + (2, vec![(2, 3)]), + (2, vec![(3, 6)]), ], ); } @@ -530,24 +543,18 @@ mod tests { fn test_merger_overlapping() { let metadata = metadata_for_test(); let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); - let weight = Arc::new(vec![0, 1, 2]); + let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); - let node1 = DataNode::new( - DataSource::Buffer(buffer1.read(&weight).unwrap()), - weight.clone(), - ); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); - let node2 = DataNode::new( - DataSource::Buffer(buffer2.read(&weight).unwrap()), - weight.clone(), - ); + let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); - let node3 = DataNode::new(DataSource::Buffer(buffer3.read(&weight).unwrap()), weight); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap())); check_merger_read( vec![node1, node3, node2], @@ -566,27 +573,22 @@ mod tests { fn test_merger_parts_and_buffer() { let metadata = metadata_for_test(); let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); - let weight = Arc::new(vec![0, 1, 2]); + let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); - let node1 = DataNode::new( - DataSource::Buffer(buffer1.read(&weight).unwrap()), - weight.clone(), - ); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); - let node2 = DataNode::new( - DataSource::Part(buffer2.freeze(&weight).unwrap().read().unwrap()), - weight.clone(), - ); + let node2 = DataNode::new(DataSource::Part( + buffer2.freeze(weight).unwrap().read().unwrap(), + )); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); - let node3 = DataNode::new( - DataSource::Part(buffer3.freeze(&weight).unwrap().read().unwrap()), - weight.clone(), - ); + let node3 = DataNode::new(DataSource::Part( + buffer3.freeze(weight).unwrap().read().unwrap(), + )); check_merger_read( vec![node1, node3, node2], @@ -605,24 +607,18 @@ mod tests { fn test_merger_overlapping_2() { let metadata = metadata_for_test(); let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); - let weight = Arc::new(vec![0, 1, 2]); + let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq); - let node1 = DataNode::new( - DataSource::Buffer(buffer1.read(&weight).unwrap()), - weight.clone(), - ); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq); - let node3 = DataNode::new( - DataSource::Buffer(buffer3.read(&weight).unwrap()), - weight.clone(), - ); + let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap())); let mut buffer4 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer4, &metadata, 0, vec![2], &mut seq); - let node4 = DataNode::new(DataSource::Buffer(buffer4.read(&weight).unwrap()), weight); + let node4 = DataNode::new(DataSource::Buffer(buffer4.read(weight).unwrap())); check_merger_read( vec![node1, node3, node4], From 30e7b3467508faa1d2147d19b0420f95905bacc9 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 22 Feb 2024 21:45:56 +0800 Subject: [PATCH 5/9] fix: format --- src/mito2/src/memtable/merge_tree/merger.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 16525e6ccbff..1673b5ef0bdc 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -75,9 +75,9 @@ pub struct Merger { } impl Merger - where - I: Item + Debug, - T: Node, +where + I: Item + Debug, + T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { let mut heap = BinaryHeap::with_capacity(nodes.len()); From 7805891a0535e7901ae3733a7b0effed21ca5483 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 23 Feb 2024 11:08:12 +0800 Subject: [PATCH 6/9] fix: some cr comments --- src/mito2/src/memtable/merge_tree/data.rs | 5 +- src/mito2/src/memtable/merge_tree/merger.rs | 74 +++++++++------------ 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 03202f325d9e..3ded540a4baf 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -689,11 +689,10 @@ impl DataParts { /// Reads data from all parts including active and frozen parts. /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. - pub fn read(&mut self, pk_weights: Vec) -> Result { - let weights = Arc::new(pk_weights); + pub fn read(&mut self, pk_weights: &[u16]) -> Result { let mut nodes = Vec::with_capacity(self.frozen.len() + 1); nodes.push(DataNode::new(DataSource::Buffer( - self.active.read(&weights)?, + self.active.read(pk_weights)?, ))); for p in &self.frozen { nodes.push(DataNode::new(DataSource::Part(p.read()?))); diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 1673b5ef0bdc..2091a16160a8 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -33,9 +33,6 @@ pub trait Item: Clone { /// Remaining rows in item. fn remaining(&self) -> usize; - /// Whether current item is exhausted. - fn is_empty(&self) -> bool; - /// The key range of item. fn current_range(&self) -> Range; @@ -101,45 +98,45 @@ where /// Advances current merger to next item. pub(crate) fn next(&mut self) -> Result<()> { - if let Some(mut top_node) = self.heap.pop() { - if let Some(next_node) = self.heap.peek() { - if next_node.is_behind(&top_node) { - // does not overlap - self.current_item = Some(top_node.fetch_next()?); - } else { - let next_start = next_node.current_item().current_range().start; - let res = match top_node.current_item().search_key(&next_start) { - Ok(pos) => { - // duplicate timestamp found, yield duplicated row in this item - if pos == 0 { - let to_yield = top_node.current_item().slice(0..1); - top_node.skip(1)?; - to_yield - } else { - let to_yield = top_node.current_item().slice(0..pos); - top_node.skip(pos)?; - to_yield - } - } - Err(pos) => { - // no duplicated timestamp + let Some(mut top_node) = self.heap.pop() else { + // heap is empty + self.current_item = None; + return Ok(()); + }; + if let Some(next_node) = self.heap.peek() { + if next_node.is_behind(&top_node) { + // does not overlap + self.current_item = Some(top_node.fetch_next()?); + } else { + let next_start = next_node.current_item().current_range().start; + let res = match top_node.current_item().search_key(&next_start) { + Ok(pos) => { + // duplicate timestamp found, yield duplicated row in this item + if pos == 0 { + let to_yield = top_node.current_item().slice(0..1); + top_node.skip(1)?; + to_yield + } else { let to_yield = top_node.current_item().slice(0..pos); top_node.skip(pos)?; to_yield } - }; - self.current_item = Some(res); - } - } else { - // top is the only node left. - self.current_item = Some(top_node.fetch_next()?); - } - if top_node.is_valid() { - self.heap.push(top_node); + } + Err(pos) => { + // no duplicated timestamp + let to_yield = top_node.current_item().slice(0..pos); + top_node.skip(pos)?; + to_yield + } + }; + self.current_item = Some(res); } } else { - // heap is empty - self.current_item = None; + // top is the only node left. + self.current_item = Some(top_node.fetch_next()?); + } + if top_node.is_valid() { + self.heap.push(top_node); } Ok(()) } @@ -214,10 +211,6 @@ impl Item for DataBatch { self.range().len() } - fn is_empty(&self) -> bool { - self.is_empty() - } - fn current_range(&self) -> Range { let range = self.range(); let batch = self.record_batch(); @@ -337,7 +330,6 @@ impl PartialEq for DataNode { } } -#[allow(clippy::non_canonical_partial_ord_impl)] impl PartialOrd for DataNode { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) From aba5c219492dc079ced984d7d0c87dc6647e2e6a Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 23 Feb 2024 11:20:34 +0800 Subject: [PATCH 7/9] fix: some cr comments --- src/mito2/src/memtable/merge_tree/data.rs | 1 + src/mito2/src/memtable/merge_tree/merger.rs | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 3ded540a4baf..25855cd46d1a 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -689,6 +689,7 @@ impl DataParts { /// Reads data from all parts including active and frozen parts. /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. + /// todo(hl): read may not take any pk weights if is read by `Shard`. pub fn read(&mut self, pk_weights: &[u16]) -> Result { let mut nodes = Vec::with_capacity(self.frozen.len() + 1); nodes.push(DataNode::new(DataSource::Buffer( diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 2091a16160a8..a5b023fbaf7e 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -622,4 +622,23 @@ mod tests { ], ); } + + #[test] + fn test_merger_overlapping_3() { + let metadata = metadata_for_test(); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let weight = &[0, 1, 2]; + let mut seq = 0; + write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq); + let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); + + check_merger_read( + vec![node1, node2], + &[(0, vec![(0, 0)]), (0, vec![(1, 2)]), (0, vec![(1, 1)])], + ); + } } From 0a0b55b57454d1869b57125d62729d219796a134 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 23 Feb 2024 11:50:27 +0800 Subject: [PATCH 8/9] refactor: simply trait's associated types --- src/mito2/src/memtable/merge_tree/merger.rs | 98 +++++++++++---------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index a5b023fbaf7e..c43fd1a3e5a0 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -27,24 +27,9 @@ use crate::error::Result; use crate::memtable::merge_tree::data::{DataBatch, DataBufferReader, DataPartReader}; use crate::memtable::merge_tree::PkIndex; -pub trait Item: Clone { - type Key: Ord; - - /// Remaining rows in item. - fn remaining(&self) -> usize; - - /// The key range of item. - fn current_range(&self) -> Range; - - /// Searches given key in current item. - fn search_key(&self, key: &Self::Key) -> std::result::Result; - - /// Slice item. - fn slice(&self, range: Range) -> Self; -} - +/// Nodes of merger's heap. pub trait Node: Ord { - type Item: Item; + type Item; /// Returns current item of node and fetch next. fn fetch_next(&mut self) -> Result; @@ -64,6 +49,12 @@ pub trait Node: Ord { /// # Panics /// If the node is EOF. fn skip(&mut self, offset_to_skip: usize) -> Result<()>; + + /// Searches given item in node's current item and returns the index. + fn search_key_in_current_item(&self, key: &Self::Item) -> std::result::Result; + + /// Slice current item. + fn slice_current_item(&self, range: Range) -> Self::Item; } pub struct Merger { @@ -73,7 +64,6 @@ pub struct Merger { impl Merger where - I: Item + Debug, T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { @@ -108,23 +98,24 @@ where // does not overlap self.current_item = Some(top_node.fetch_next()?); } else { - let next_start = next_node.current_item().current_range().start; - let res = match top_node.current_item().search_key(&next_start) { + let res = match top_node.search_key_in_current_item(next_node.current_item()) { Ok(pos) => { - // duplicate timestamp found, yield duplicated row in this item if pos == 0 { - let to_yield = top_node.current_item().slice(0..1); + // if the first item of top node has duplicate ts with next node, + // we can simply return the first row in that it must be the one + // with max sequence. + let to_yield = top_node.slice_current_item(0..1); top_node.skip(1)?; to_yield } else { - let to_yield = top_node.current_item().slice(0..pos); + let to_yield = top_node.slice_current_item(0..pos); top_node.skip(pos)?; to_yield } } Err(pos) => { // no duplicated timestamp - let to_yield = top_node.current_item().slice(0..pos); + let to_yield = top_node.slice_current_item(0..pos); top_node.skip(pos)?; to_yield } @@ -204,14 +195,12 @@ impl DataBatch { } } -impl Item for DataBatch { - type Key = DataBatchKey; - +impl DataBatch { fn remaining(&self) -> usize { self.range().len() } - fn current_range(&self) -> Range { + fn current_range(&self) -> Range { let range = self.range(); let batch = self.record_batch(); let pk_index = self.pk_index(); @@ -229,7 +218,7 @@ impl Item for DataBatch { } } - fn search_key(&self, key: &Self::Key) -> std::result::Result { + fn search_key(&self, key: &DataBatchKey) -> std::result::Result { let DataBatchKey { pk_index, timestamp, @@ -267,25 +256,7 @@ impl DataNode { } fn next(&mut self) -> Result<()> { - let next = match &mut self.source { - DataSource::Buffer(b) => { - b.next()?; - if b.is_valid() { - Some(b.current_data_batch()) - } else { - None - } - } - DataSource::Part(p) => { - p.next()?; - if p.is_valid() { - Some(p.current_data_batch()) - } else { - None - } - } - }; - self.current_data_batch = next; + self.current_data_batch = self.source.fetch_next()?; Ok(()) } @@ -306,6 +277,28 @@ impl DataSource { DataSource::Part(p) => p.current_data_batch(), } } + + fn fetch_next(&mut self) -> Result> { + let res = match self { + DataSource::Buffer(b) => { + b.next()?; + if b.is_valid() { + Some(b.current_data_batch()) + } else { + None + } + } + DataSource::Part(p) => { + p.next()?; + if p.is_valid() { + Some(p.current_data_batch()) + } else { + None + } + } + }; + Ok(res) + } } impl Ord for DataNode { @@ -373,6 +366,15 @@ impl Node for DataNode { Ok(()) } + + fn search_key_in_current_item(&self, key: &Self::Item) -> std::result::Result { + let key = key.current_range().start; + self.current_data_batch.as_ref().unwrap().search_key(&key) + } + + fn slice_current_item(&self, range: Range) -> Self::Item { + self.current_data_batch.as_ref().unwrap().slice(range) + } } fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { From d83e3fe8bce8b2232c1d9f327cefea0802ef31ef Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 23 Feb 2024 13:57:22 +0800 Subject: [PATCH 9/9] fix: some cr comments --- src/mito2/src/memtable/merge_tree/data.rs | 2 +- src/mito2/src/memtable/merge_tree/merger.rs | 25 +++++++++------------ 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 25855cd46d1a..96db38f673f4 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -709,7 +709,7 @@ impl DataParts { /// Reader for all parts inside a `DataParts`. pub struct DataPartsReader { - merger: Merger, + merger: Merger, } impl DataPartsReader { diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index c43fd1a3e5a0..7f54183cdd91 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -57,14 +57,14 @@ pub trait Node: Ord { fn slice_current_item(&self, range: Range) -> Self::Item; } -pub struct Merger { +pub struct Merger { heap: BinaryHeap, - current_item: Option, + current_item: Option, } -impl Merger +impl Merger where - T: Node, + T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { let mut heap = BinaryHeap::with_capacity(nodes.len()); @@ -133,7 +133,7 @@ where } /// Returns current item held by merger. - pub(crate) fn current_item(&self) -> &I { + pub(crate) fn current_item(&self) -> &T::Item { self.current_item.as_ref().unwrap() } } @@ -200,7 +200,7 @@ impl DataBatch { self.range().len() } - fn current_range(&self) -> Range { + fn first_key(&self) -> DataBatchKey { let range = self.range(); let batch = self.record_batch(); let pk_index = self.pk_index(); @@ -208,13 +208,10 @@ impl DataBatch { // maybe safe the result somewhere. let ts_values = timestamp_array_to_i64_slice(ts_array); - let (ts_start, ts_end) = (ts_values[range.start], ts_values[range.end - 1]); + let timestamp = ts_values[range.start]; DataBatchKey { pk_index, - timestamp: ts_start, - }..DataBatchKey { - pk_index, - timestamp: ts_end, + timestamp, } } @@ -223,9 +220,7 @@ impl DataBatch { pk_index, timestamp, } = key; - if *pk_index != self.pk_index() { - return Err(self.range().end); - } + assert_eq!(*pk_index, self.pk_index); let ts_values = timestamp_array_to_i64_slice(self.record_batch().column(1)); ts_values.binary_search(timestamp) } @@ -368,7 +363,7 @@ impl Node for DataNode { } fn search_key_in_current_item(&self, key: &Self::Item) -> std::result::Result { - let key = key.current_range().start; + let key = key.first_key(); self.current_data_batch.as_ref().unwrap().search_key(&key) }