diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index b328eb08cc33..9497791d2d3c 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use bytes::Bytes; use datatypes::arrow; -use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array}; +use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array}; use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; use datatypes::data_type::DataType; use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef}; @@ -42,7 +42,7 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger}; +use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger}; use crate::memtable::merge_tree::PkIndex; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; @@ -50,28 +50,43 @@ const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; /// Initial capacity for the data buffer. pub(crate) const DATA_INIT_CAP: usize = 8; -/// Data part batches returns by `DataParts::read`. -#[derive(Debug, Clone)] -pub struct DataBatch { +/// Range of a data batch. +#[derive(Debug, Clone, Copy)] +pub(crate) struct DataBatchRange { /// Primary key index of this batch. pub(crate) pk_index: PkIndex, + /// Start of current primary key inside record batch. + pub(crate) start: usize, + /// End of current primary key inside record batch. + pub(crate) end: usize, +} + +impl DataBatchRange { + pub(crate) fn len(&self) -> usize { + (self.start..self.end).len() + } + + pub(crate) fn is_empty(&self) -> bool { + (self.start..self.end).is_empty() + } +} + +/// Data part batches returns by `DataParts::read`. +#[derive(Debug, Clone, Copy)] +pub struct DataBatch<'a> { /// Record batch of data. - pub(crate) rb: RecordBatch, + rb: &'a RecordBatch, /// Range of current primary key inside record batch - pub(crate) range: Range, + range: DataBatchRange, } -impl DataBatch { +impl<'a> DataBatch<'a> { pub(crate) fn pk_index(&self) -> PkIndex { - self.pk_index - } - - pub(crate) fn record_batch(&self) -> &RecordBatch { - &self.rb + self.range.pk_index } - pub(crate) fn range(&self) -> Range { - self.range.clone() + pub(crate) fn range(&self) -> DataBatchRange { + self.range } pub(crate) fn is_empty(&self) -> bool { @@ -81,6 +96,73 @@ impl DataBatch { pub(crate) fn slice_record_batch(&self) -> RecordBatch { self.rb.slice(self.range.start, self.range.len()) } + + pub(crate) fn first_row(&self) -> (i64, u64) { + let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); + let sequence_values = self + .rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values(); + ( + ts_values[self.range.start], + sequence_values[self.range.start], + ) + } + + pub(crate) fn last_row(&self) -> (i64, u64) { + let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); + let sequence_values = self + .rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values(); + ( + ts_values[self.range.end - 1], + sequence_values[self.range.end - 1], + ) + } + + pub(crate) fn first_key(&self) -> DataBatchKey { + let pk_index = self.pk_index(); + let ts_array = self.rb.column(1); + + // maybe safe the result somewhere. + let ts_values = timestamp_array_to_i64_slice(ts_array); + let timestamp = ts_values[self.range.start]; + DataBatchKey { + pk_index, + timestamp, + } + } + + pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result { + let DataBatchKey { + pk_index, + timestamp, + } = key; + assert_eq!(*pk_index, self.range.pk_index); + let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); + let ts_values = &ts_values[self.range.start..self.range.end]; + ts_values.binary_search(timestamp) + } + + pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> { + let start = self.range.start + offset; + let end = start + length; + DataBatch { + rb: self.rb, + range: DataBatchRange { + pk_index: self.range.pk_index, + start, + end, + }, + } + } } /// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard. @@ -307,11 +389,45 @@ fn data_buffer_to_record_batches( RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu) } +pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { + use datatypes::arrow::array::{ + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }; + use datatypes::arrow::datatypes::{DataType, TimeUnit}; + + match arr.data_type() { + DataType::Timestamp(t, _) => match t { + TimeUnit::Second => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Millisecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Microsecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + TimeUnit::Nanosecond => arr + .as_any() + .downcast_ref::() + .unwrap() + .values(), + }, + _ => unreachable!(), + } +} + #[derive(Debug)] pub(crate) struct DataBufferReader { batch: RecordBatch, offset: usize, - current_batch: Option<(PkIndex, Range)>, + current_range: Option, } impl DataBufferReader { @@ -319,25 +435,23 @@ impl DataBufferReader { let mut reader = Self { batch, offset: 0, - current_batch: None, + current_range: None, }; reader.next()?; // fill data batch for comparison and merge. Ok(reader) } pub(crate) fn is_valid(&self) -> bool { - self.current_batch.is_some() + self.current_range.is_some() } + /// Returns current data batch. /// # Panics /// If Current reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { - let (pk_index, range) = self.current_batch.as_ref().unwrap(); - let rb = self.batch.slice(range.start, range.len()); - let range = 0..rb.num_rows(); + let range = self.current_range.unwrap(); DataBatch { - pk_index: *pk_index, - rb, + rb: &self.batch, range, } } @@ -345,22 +459,25 @@ impl DataBufferReader { /// # Panics /// If Current reader is exhausted. pub(crate) fn current_pk_index(&self) -> PkIndex { - let (pk_index, _) = self.current_batch.as_ref().unwrap(); - *pk_index + self.current_range.as_ref().unwrap().pk_index } /// Advances reader to next data batch. pub(crate) fn next(&mut self) -> Result<()> { if self.offset >= self.batch.num_rows() { - self.current_batch = None; + self.current_range = None; return Ok(()); } let pk_index_array = pk_index_array(&self.batch); if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) { self.offset = range.end; - self.current_batch = Some((next_pk, range)) + self.current_range = Some(DataBatchRange { + pk_index: next_pk, + start: range.start, + end: range.end, + }); } else { - self.current_batch = None; + self.current_range = None; } Ok(()) } @@ -579,16 +696,14 @@ impl DataPart { pub struct DataPartReader { inner: ParquetRecordBatchReader, - current_range: Range, - current_pk_index: Option, current_batch: Option, + current_range: Option, } impl Debug for DataPartReader { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DataPartReader") .field("current_range", &self.current_range) - .field("current_pk_index", &self.current_pk_index) .finish() } } @@ -603,9 +718,8 @@ impl DataPartReader { let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?; let mut reader = Self { inner: parquet_reader, - current_pk_index: None, - current_range: 0..0, current_batch: None, + current_range: None, }; reader.next()?; Ok(reader) @@ -613,7 +727,7 @@ impl DataPartReader { /// Returns false if current reader is exhausted. pub(crate) fn is_valid(&self) -> bool { - self.current_pk_index.is_some() + self.current_range.is_some() } /// Returns current pk index. @@ -621,25 +735,16 @@ impl DataPartReader { /// # Panics /// If reader is exhausted. pub(crate) fn current_pk_index(&self) -> PkIndex { - self.current_pk_index.expect("DataPartReader is exhausted") + self.current_range.as_ref().unwrap().pk_index } /// Returns current data batch of reader. /// # Panics /// If reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { - let pk_index = self.current_pk_index.unwrap(); - let range = self.current_range.clone(); - let rb = self - .current_batch - .as_ref() - .unwrap() - .slice(range.start, range.len()); - - let range = 0..rb.num_rows(); + let range = self.current_range.unwrap(); DataBatch { - pk_index, - rb, + rb: self.current_batch.as_ref().unwrap(), range, } } @@ -647,19 +752,22 @@ impl DataPartReader { pub(crate) fn next(&mut self) -> Result<()> { if let Some((next_pk, range)) = self.search_next_pk_range() { // first try to search next pk in current record batch. - self.current_pk_index = Some(next_pk); - self.current_range = range; + self.current_range = Some(DataBatchRange { + pk_index: next_pk, + start: range.start, + end: range.end, + }); } else { // current record batch reaches eof, fetch next record batch from parquet reader. if let Some(rb) = self.inner.next() { let rb = rb.context(error::ComputeArrowSnafu)?; - self.current_range = 0..0; self.current_batch = Some(rb); + self.current_range = None; return self.next(); } else { // parquet is also exhausted - self.current_pk_index = None; self.current_batch = None; + self.current_range = None; } } @@ -671,7 +779,12 @@ impl DataPartReader { self.current_batch.as_ref().and_then(|b| { // safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part. let pk_array = pk_index_array(b); - search_next_pk_range(pk_array, self.current_range.end) + let start = self + .current_range + .as_ref() + .map(|range| range.end) + .unwrap_or(0); + search_next_pk_range(pk_array, start) }) } } @@ -741,8 +854,9 @@ pub struct DataPartsReader { } impl DataPartsReader { - pub(crate) fn current_data_batch(&self) -> &DataBatch { - self.merger.current_item() + pub(crate) fn current_data_batch(&self) -> DataBatch { + let batch = self.merger.current_node().current_data_batch(); + batch.slice(0, self.merger.current_rows()) } pub(crate) fn next(&mut self) -> Result<()> { @@ -762,7 +876,6 @@ mod tests { use parquet::data_type::AsBytes; use super::*; - use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; #[test] @@ -1013,7 +1126,7 @@ mod tests { .zip(sequence.iter()) .map(|(ts, seq)| (*ts, *seq)) .collect::>(); - res.push((batch.pk_index, ts_and_seq)); + res.push((batch.pk_index(), ts_and_seq)); reader.next().unwrap(); } diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 509e12e9cd78..a6394ea924fe 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -17,55 +17,46 @@ use std::collections::BinaryHeap; use std::fmt::Debug; use std::ops::Range; -use datatypes::arrow::array::{ - ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, UInt64Array, -}; -use datatypes::arrow::datatypes::{DataType, TimeUnit}; - use crate::error::Result; use crate::memtable::merge_tree::data::{DataBatch, DataBufferReader, DataPartReader}; use crate::memtable::merge_tree::PkIndex; /// Nodes of merger's heap. pub trait Node: Ord { - type Item; - - /// Returns current item of node and fetch next. - fn fetch_next(&mut self) -> Result; - /// Returns true if current node is not exhausted. fn is_valid(&self) -> bool; - /// Current item of node. - fn current_item(&self) -> &Self::Item; - /// Whether the other node is behind (exclusive) current node. fn is_behind(&self, other: &Self) -> bool; - /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches + /// Advances `len` rows from current batch. If current batch is empty it fetches /// next batch from the node. /// /// # Panics - /// If the node is EOF. - fn skip(&mut self, offset_to_skip: usize) -> Result<()>; + /// If the node is invalid. + fn advance(&mut self, len: usize) -> Result<()>; - /// Searches given item in node's current item and returns the index. - fn search_key_in_current_item(&self, key: &Self::Item) -> std::result::Result; + /// Length of current item. + fn current_item_len(&self) -> usize; - /// Slice current item. - fn slice_current_item(&self, range: Range) -> Self::Item; + /// Searches first key of `other` in current item and returns the index. + fn search_key_in_current_item(&self, other: &Self) -> Result; } pub struct Merger { + /// Heap to find node to read. + /// + /// Nodes in the heap are always valid. heap: BinaryHeap, - current_item: Option, + /// Current node to read. + /// + /// The node is always valid if it is not None. + current_node: Option, + /// The number of rows in current node that are valid to read. + current_rows: usize, } -impl Merger -where - T: Node, -{ +impl Merger { pub(crate) fn try_new(nodes: Vec) -> Result { let mut heap = BinaryHeap::with_capacity(nodes.len()); for node in nodes { @@ -75,7 +66,8 @@ where } let mut merger = Merger { heap, - current_item: None, + current_node: None, + current_rows: 0, }; merger.next()?; Ok(merger) @@ -83,224 +75,154 @@ where /// Returns true if current merger is still valid. pub(crate) fn is_valid(&self) -> bool { - self.current_item.is_some() + self.current_node.is_some() + } + + /// Returns current node to read. Only [Self::current_rows] rows in current node + /// are valid to read. + /// + /// # Panics + /// Panics if the merger is invalid. + pub(crate) fn current_node(&self) -> &T { + self.current_node.as_ref().unwrap() + } + + /// Returns rows of current node to read. + pub(crate) fn current_rows(&self) -> usize { + self.current_rows } - /// Advances current merger to next item. + /// Advances the merger to the next item. pub(crate) fn next(&mut self) -> Result<()> { - let Some(mut top_node) = self.heap.pop() else { - // heap is empty - self.current_item = None; + self.maybe_advance_current_node()?; + debug_assert!(self.current_node.is_none()); + + // Finds node and range to read from the heap. + let Some(top_node) = self.heap.pop() else { + // Heap is empty. return Ok(()); }; if let Some(next_node) = self.heap.peek() { if next_node.is_behind(&top_node) { - // does not overlap - self.current_item = Some(top_node.fetch_next()?); + // Does not overlap. + self.current_rows = top_node.current_item_len(); } else { - let res = match top_node.search_key_in_current_item(next_node.current_item()) { + // Note that the heap ensures the top node always has the minimal row. + match top_node.search_key_in_current_item(next_node) { Ok(pos) => { if pos == 0 { - // if the first item of top node has duplicate ts with next node, - // we can simply return the first row in that it must be the one + // If the first item of top node has duplicate key with the next node, + // we can simply return the first row in the top node as it must be the one // with max sequence. - let to_yield = top_node.slice_current_item(0..1); - top_node.skip(1)?; - to_yield + self.current_rows = 1; } else { - let to_yield = top_node.slice_current_item(0..pos); - top_node.skip(pos)?; - to_yield + // We don't know which one has the larger sequence so we use the range before + // the duplicate pos. + self.current_rows = pos; } } Err(pos) => { - // no duplicated timestamp - let to_yield = top_node.slice_current_item(0..pos); - top_node.skip(pos)?; - to_yield + // No duplication. Output rows before pos. + debug_assert!(pos > 0); + self.current_rows = pos; } - }; - self.current_item = Some(res); + } } } else { - // top is the only node left. - self.current_item = Some(top_node.fetch_next()?); + // Top is the only node left. We can read all rows in it. + self.current_rows = top_node.current_item_len(); } - if top_node.is_valid() { - self.heap.push(top_node); - } - Ok(()) - } + self.current_node = Some(top_node); - /// Returns current item held by merger. - pub(crate) fn current_item(&self) -> &T::Item { - self.current_item.as_ref().unwrap() + Ok(()) } -} - -#[derive(Debug)] -pub struct DataBatchKey { - pk_index: PkIndex, - timestamp: i64, -} -impl Eq for DataBatchKey {} + fn maybe_advance_current_node(&mut self) -> Result<()> { + let Some(mut node) = self.current_node.take() else { + return Ok(()); + }; -impl PartialEq for DataBatchKey { - fn eq(&self, other: &Self) -> bool { - self.pk_index == other.pk_index && self.timestamp == other.timestamp - } -} + // Advances current node. + node.advance(self.current_rows)?; + self.current_rows = 0; + if !node.is_valid() { + return Ok(()); + } -impl PartialOrd for DataBatchKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + // Puts the node into the heap. + self.heap.push(node); + Ok(()) } } -impl Ord for DataBatchKey { - fn cmp(&self, other: &Self) -> Ordering { - self.pk_index - .cmp(&other.pk_index) - .then(self.timestamp.cmp(&other.timestamp)) - .reverse() - } +#[derive(Debug)] +pub(crate) struct DataBatchKey { + pub(crate) pk_index: PkIndex, + pub(crate) timestamp: i64, } -impl DataBatch { - fn first_row(&self) -> (i64, u64) { - let range = self.range(); - let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); - let sequence_values = self - .rb - .column(2) - .as_any() - .downcast_ref::() - .unwrap() - .values(); - (ts_values[range.start], sequence_values[range.start]) - } - - fn last_row(&self) -> (i64, u64) { - let range = self.range(); - let ts_values = timestamp_array_to_i64_slice(self.rb.column(1)); - let sequence_values = self - .rb - .column(2) - .as_any() - .downcast_ref::() - .unwrap() - .values(); - (ts_values[range.end - 1], sequence_values[range.end - 1]) - } +pub(crate) enum DataSource { + Buffer(DataBufferReader), + Part(DataPartReader), } -impl DataBatch { - fn remaining(&self) -> usize { - self.range().len() - } - - fn first_key(&self) -> DataBatchKey { - let range = self.range(); - let batch = self.record_batch(); - let pk_index = self.pk_index(); - let ts_array = batch.column(1); - - // maybe safe the result somewhere. - let ts_values = timestamp_array_to_i64_slice(ts_array); - let timestamp = ts_values[range.start]; - DataBatchKey { - pk_index, - timestamp, +impl DataSource { + fn current_data_batch(&self) -> DataBatch { + match self { + DataSource::Buffer(buffer) => buffer.current_data_batch(), + DataSource::Part(p) => p.current_data_batch(), } } - fn search_key(&self, key: &DataBatchKey) -> std::result::Result { - let DataBatchKey { - pk_index, - timestamp, - } = key; - assert_eq!(*pk_index, self.pk_index); - let ts_values = timestamp_array_to_i64_slice(self.record_batch().column(1)); - ts_values.binary_search(timestamp) + fn is_valid(&self) -> bool { + match self { + DataSource::Buffer(b) => b.is_valid(), + DataSource::Part(p) => p.is_valid(), + } } - fn slice(&self, range: Range) -> Self { - let rb = self.rb.slice(range.start, range.len()); - let range = 0..rb.num_rows(); - Self { - pk_index: self.pk_index, - rb, - range, + fn next(&mut self) -> Result<()> { + match self { + DataSource::Buffer(b) => b.next(), + DataSource::Part(p) => p.next(), } } } -pub struct DataNode { +pub(crate) struct DataNode { source: DataSource, - current_data_batch: Option, + /// Current range of the batch in the source. + current_range: Option>, } impl DataNode { pub(crate) fn new(source: DataSource) -> Self { - let current_data_batch = source.current_data_batch(); + let current_range = source + .is_valid() + .then(|| 0..source.current_data_batch().range().len()); + Self { source, - current_data_batch: Some(current_data_batch), + current_range, } } - fn next(&mut self) -> Result<()> { - self.current_data_batch = self.source.fetch_next()?; - Ok(()) - } - - fn current_data_batch(&self) -> &DataBatch { - self.current_data_batch.as_ref().unwrap() - } -} - -pub enum DataSource { - Buffer(DataBufferReader), - Part(DataPartReader), -} - -impl DataSource { pub(crate) fn current_data_batch(&self) -> DataBatch { - match self { - DataSource::Buffer(buffer) => buffer.current_data_batch(), - DataSource::Part(p) => p.current_data_batch(), - } + let range = self.current_range(); + let batch = self.source.current_data_batch(); + batch.slice(range.start, range.len()) } - fn fetch_next(&mut self) -> Result> { - let res = match self { - DataSource::Buffer(b) => { - b.next()?; - if b.is_valid() { - Some(b.current_data_batch()) - } else { - None - } - } - DataSource::Part(p) => { - p.next()?; - if p.is_valid() { - Some(p.current_data_batch()) - } else { - None - } - } - }; - Ok(res) + fn current_range(&self) -> Range { + self.current_range.clone().unwrap() } } impl Ord for DataNode { fn cmp(&self, other: &Self) -> Ordering { - let weight = self.current_data_batch().pk_index; + let weight = self.current_data_batch().pk_index(); let (ts_start, sequence) = self.current_data_batch().first_row(); - let other_weight = other.current_data_batch().pk_index; + let other_weight = other.current_data_batch().pk_index(); let (other_ts_start, other_sequence) = other.current_data_batch().first_row(); (weight, ts_start, Reverse(sequence)) .cmp(&(other_weight, other_ts_start, Reverse(other_sequence))) @@ -325,78 +247,47 @@ impl PartialOrd for DataNode { } impl Node for DataNode { - type Item = DataBatch; - - fn fetch_next(&mut self) -> Result { - let current = self.current_data_batch.take(); - self.next()?; - Ok(current.unwrap()) - } - fn is_valid(&self) -> bool { - self.current_data_batch.is_some() - } - - fn current_item(&self) -> &Self::Item { - self.current_data_batch() + self.current_range.is_some() } fn is_behind(&self, other: &Self) -> bool { - let pk_weight = self.current_data_batch().pk_index; + let pk_weight = self.current_data_batch().pk_index(); let (start, seq) = self.current_data_batch().first_row(); - let other_pk_weight = other.current_data_batch().pk_index; + let other_pk_weight = other.current_data_batch().pk_index(); let (other_end, other_seq) = other.current_data_batch().last_row(); (pk_weight, start, Reverse(seq)) > (other_pk_weight, other_end, Reverse(other_seq)) } - fn skip(&mut self, offset_to_skip: usize) -> Result<()> { - let current = self.current_item(); - let remaining = current.remaining() - offset_to_skip; + fn advance(&mut self, len: usize) -> Result<()> { + let mut range = self.current_range(); + debug_assert!(range.len() >= len); + + let remaining = range.len() - len; if remaining == 0 { - self.next()?; + // Nothing remains, we need to fetch next batch to ensure the current batch is not empty. + self.source.next()?; + if self.source.is_valid() { + self.current_range = Some(0..self.source.current_data_batch().range().len()); + } else { + // The node is exhausted. + self.current_range = None; + } } else { - let end = current.remaining(); - self.current_data_batch = Some(current.slice(offset_to_skip..end)); + range.start += len; + self.current_range = Some(range); } Ok(()) } - fn search_key_in_current_item(&self, key: &Self::Item) -> std::result::Result { - let key = key.first_key(); - self.current_data_batch.as_ref().unwrap().search_key(&key) + fn current_item_len(&self) -> usize { + self.current_range.clone().unwrap().len() } - fn slice_current_item(&self, range: Range) -> Self::Item { - self.current_data_batch.as_ref().unwrap().slice(range) - } -} - -pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { - match arr.data_type() { - DataType::Timestamp(t, _) => match t { - TimeUnit::Second => arr - .as_any() - .downcast_ref::() - .unwrap() - .values(), - TimeUnit::Millisecond => arr - .as_any() - .downcast_ref::() - .unwrap() - .values(), - TimeUnit::Microsecond => arr - .as_any() - .downcast_ref::() - .unwrap() - .values(), - TimeUnit::Nanosecond => arr - .as_any() - .downcast_ref::() - .unwrap() - .values(), - }, - _ => unreachable!(), + fn search_key_in_current_item(&self, other: &Self) -> Result { + let key = other.current_data_batch().first_key(); + self.current_data_batch().search_key(&key) } } @@ -406,7 +297,7 @@ mod tests { use store_api::metadata::RegionMetadataRef; use super::*; - use crate::memtable::merge_tree::data::DataBuffer; + use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBuffer}; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; fn write_rows_to_buffer( @@ -439,7 +330,8 @@ mod tests { let mut res = vec![]; while merger.is_valid() { - let data_batch = merger.current_item(); + let data_batch = merger.current_node().current_data_batch(); + let data_batch = data_batch.slice(0, merger.current_rows()); let batch = data_batch.slice_record_batch(); let ts_array = batch.column(1); let ts_values: Vec<_> = timestamp_array_to_i64_slice(ts_array).to_vec(); @@ -456,7 +348,7 @@ mod tests { .map(|(ts, seq)| (ts, seq.unwrap())) .collect::>(); - res.push((data_batch.pk_index, ts_and_seq)); + res.push((data_batch.pk_index(), ts_and_seq)); merger.next().unwrap(); } assert_eq!(expected, &res);