Skip to content

Commit

Permalink
refactor: Remove Item from merger's Node trait (#3371)
Browse files Browse the repository at this point in the history
* refactor: data reader returns reference to data batch

* refactor: use range to create merger

* chore: Reference RecordBatch in DataBatch

* fix: top node not read if no next node

* refactor: move timestamp_array_to_i64_slice to data mod

* style: fix cilppy

* chore: derive copy for DataBatch

* chore: address CR comments
  • Loading branch information
evenyag authored Feb 24, 2024
1 parent a6564e7 commit 1df64f2
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 298 deletions.
223 changes: 168 additions & 55 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use bytes::Bytes;
use datatypes::arrow;
use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array};
use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array};
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
Expand All @@ -42,36 +42,51 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger};
use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::merge_tree::PkIndex;

const PK_INDEX_COLUMN_NAME: &str = "__pk_index";

/// Initial capacity for the data buffer.
pub(crate) const DATA_INIT_CAP: usize = 8;

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone)]
pub struct DataBatch {
/// Range of a data batch.
#[derive(Debug, Clone, Copy)]
pub(crate) struct DataBatchRange {
/// Primary key index of this batch.
pub(crate) pk_index: PkIndex,
/// Start of current primary key inside record batch.
pub(crate) start: usize,
/// End of current primary key inside record batch.
pub(crate) end: usize,
}

impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
(self.start..self.end).len()
}

pub(crate) fn is_empty(&self) -> bool {
(self.start..self.end).is_empty()
}
}

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone, Copy)]
pub struct DataBatch<'a> {
/// Record batch of data.
pub(crate) rb: RecordBatch,
rb: &'a RecordBatch,
/// Range of current primary key inside record batch
pub(crate) range: Range<usize>,
range: DataBatchRange,
}

impl DataBatch {
impl<'a> DataBatch<'a> {
pub(crate) fn pk_index(&self) -> PkIndex {
self.pk_index
}

pub(crate) fn record_batch(&self) -> &RecordBatch {
&self.rb
self.range.pk_index
}

pub(crate) fn range(&self) -> Range<usize> {
self.range.clone()
pub(crate) fn range(&self) -> DataBatchRange {
self.range
}

pub(crate) fn is_empty(&self) -> bool {
Expand All @@ -81,6 +96,73 @@ impl DataBatch {
pub(crate) fn slice_record_batch(&self) -> RecordBatch {
self.rb.slice(self.range.start, self.range.len())
}

pub(crate) fn first_row(&self) -> (i64, u64) {
let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
let sequence_values = self
.rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
(
ts_values[self.range.start],
sequence_values[self.range.start],
)
}

pub(crate) fn last_row(&self) -> (i64, u64) {
let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
let sequence_values = self
.rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
(
ts_values[self.range.end - 1],
sequence_values[self.range.end - 1],
)
}

pub(crate) fn first_key(&self) -> DataBatchKey {
let pk_index = self.pk_index();
let ts_array = self.rb.column(1);

// maybe safe the result somewhere.
let ts_values = timestamp_array_to_i64_slice(ts_array);
let timestamp = ts_values[self.range.start];
DataBatchKey {
pk_index,
timestamp,
}
}

pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result<usize, usize> {
let DataBatchKey {
pk_index,
timestamp,
} = key;
assert_eq!(*pk_index, self.range.pk_index);
let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
let ts_values = &ts_values[self.range.start..self.range.end];
ts_values.binary_search(timestamp)
}

pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> {
let start = self.range.start + offset;
let end = start + length;
DataBatch {
rb: self.rb,
range: DataBatchRange {
pk_index: self.range.pk_index,
start,
end,
},
}
}
}

/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
Expand Down Expand Up @@ -307,60 +389,95 @@ fn data_buffer_to_record_batches(
RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
}

pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
use datatypes::arrow::array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use datatypes::arrow::datatypes::{DataType, TimeUnit};

match arr.data_type() {
DataType::Timestamp(t, _) => match t {
TimeUnit::Second => arr
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values(),
TimeUnit::Millisecond => arr
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values(),
TimeUnit::Microsecond => arr
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values(),
TimeUnit::Nanosecond => arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values(),
},
_ => unreachable!(),
}
}

#[derive(Debug)]
pub(crate) struct DataBufferReader {
batch: RecordBatch,
offset: usize,
current_batch: Option<(PkIndex, Range<usize>)>,
current_range: Option<DataBatchRange>,
}

impl DataBufferReader {
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
let mut reader = Self {
batch,
offset: 0,
current_batch: None,
current_range: None,
};
reader.next()?; // fill data batch for comparison and merge.
Ok(reader)
}

pub(crate) fn is_valid(&self) -> bool {
self.current_batch.is_some()
self.current_range.is_some()
}

/// Returns current data batch.
/// # Panics
/// If Current reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let (pk_index, range) = self.current_batch.as_ref().unwrap();
let rb = self.batch.slice(range.start, range.len());
let range = 0..rb.num_rows();
let range = self.current_range.unwrap();
DataBatch {
pk_index: *pk_index,
rb,
rb: &self.batch,
range,
}
}

/// # Panics
/// If Current reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
let (pk_index, _) = self.current_batch.as_ref().unwrap();
*pk_index
self.current_range.as_ref().unwrap().pk_index
}

/// Advances reader to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
self.current_batch = None;
self.current_range = None;
return Ok(());
}
let pk_index_array = pk_index_array(&self.batch);
if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
self.offset = range.end;
self.current_batch = Some((next_pk, range))
self.current_range = Some(DataBatchRange {
pk_index: next_pk,
start: range.start,
end: range.end,
});
} else {
self.current_batch = None;
self.current_range = None;
}
Ok(())
}
Expand Down Expand Up @@ -579,16 +696,14 @@ impl DataPart {

pub struct DataPartReader {
inner: ParquetRecordBatchReader,
current_range: Range<usize>,
current_pk_index: Option<PkIndex>,
current_batch: Option<RecordBatch>,
current_range: Option<DataBatchRange>,
}

impl Debug for DataPartReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataPartReader")
.field("current_range", &self.current_range)
.field("current_pk_index", &self.current_pk_index)
.finish()
}
}
Expand All @@ -603,63 +718,56 @@ impl DataPartReader {
let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
let mut reader = Self {
inner: parquet_reader,
current_pk_index: None,
current_range: 0..0,
current_batch: None,
current_range: None,
};
reader.next()?;
Ok(reader)
}

/// Returns false if current reader is exhausted.
pub(crate) fn is_valid(&self) -> bool {
self.current_pk_index.is_some()
self.current_range.is_some()
}

/// Returns current pk index.
///
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_pk_index.expect("DataPartReader is exhausted")
self.current_range.as_ref().unwrap().pk_index
}

/// Returns current data batch of reader.
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let pk_index = self.current_pk_index.unwrap();
let range = self.current_range.clone();
let rb = self
.current_batch
.as_ref()
.unwrap()
.slice(range.start, range.len());

let range = 0..rb.num_rows();
let range = self.current_range.unwrap();
DataBatch {
pk_index,
rb,
rb: self.current_batch.as_ref().unwrap(),
range,
}
}

pub(crate) fn next(&mut self) -> Result<()> {
if let Some((next_pk, range)) = self.search_next_pk_range() {
// first try to search next pk in current record batch.
self.current_pk_index = Some(next_pk);
self.current_range = range;
self.current_range = Some(DataBatchRange {
pk_index: next_pk,
start: range.start,
end: range.end,
});
} else {
// current record batch reaches eof, fetch next record batch from parquet reader.
if let Some(rb) = self.inner.next() {
let rb = rb.context(error::ComputeArrowSnafu)?;
self.current_range = 0..0;
self.current_batch = Some(rb);
self.current_range = None;
return self.next();
} else {
// parquet is also exhausted
self.current_pk_index = None;
self.current_batch = None;
self.current_range = None;
}
}

Expand All @@ -671,7 +779,12 @@ impl DataPartReader {
self.current_batch.as_ref().and_then(|b| {
// safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part.
let pk_array = pk_index_array(b);
search_next_pk_range(pk_array, self.current_range.end)
let start = self
.current_range
.as_ref()
.map(|range| range.end)
.unwrap_or(0);
search_next_pk_range(pk_array, start)
})
}
}
Expand Down Expand Up @@ -741,8 +854,9 @@ pub struct DataPartsReader {
}

impl DataPartsReader {
pub(crate) fn current_data_batch(&self) -> &DataBatch {
self.merger.current_item()
pub(crate) fn current_data_batch(&self) -> DataBatch {
let batch = self.merger.current_node().current_data_batch();
batch.slice(0, self.merger.current_rows())
}

pub(crate) fn next(&mut self) -> Result<()> {
Expand All @@ -762,7 +876,6 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};

#[test]
Expand Down Expand Up @@ -1013,7 +1126,7 @@ mod tests {
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index, ts_and_seq));
res.push((batch.pk_index(), ts_and_seq));

reader.next().unwrap();
}
Expand Down
Loading

0 comments on commit 1df64f2

Please sign in to comment.