Skip to content

Commit

Permalink
feat: merge tree data parts (#3346)
Browse files Browse the repository at this point in the history
* feat: add iter method for DataPart

* chore: rename iter to reader

* chore: some doc

* fix: resolve some comments

* fix: remove metadata in DataPart
  • Loading branch information
v0y4g3r authored Feb 21, 2024
1 parent 4c07606 commit 90169c8
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 28 deletions.
8 changes: 7 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,12 @@ pub enum Error {
error: parquet::errors::ParquetError,
location: Location,
},

#[snafu(display("Failed to iter data part"))]
ReadDataPart {
#[snafu(source)]
error: parquet::errors::ParquetError,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -669,7 +675,7 @@ impl ErrorExt for Error {
FilterRecordBatch { source, .. } => source.status_code(),
Upload { .. } => StatusCode::StorageUnavailable,
BiError { .. } => StatusCode::Internal,
EncodeMemtable { .. } => StatusCode::Internal,
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
}
}

Expand Down
241 changes: 214 additions & 27 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Data part of a shard.
use std::cmp::{Ordering, Reverse};
use std::fmt::{Debug, Formatter};
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -31,6 +32,7 @@ use datatypes::vectors::{
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8VectorBuilder,
};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
Expand Down Expand Up @@ -140,13 +142,13 @@ impl DataBuffer {
/// `freeze` clears the buffers of builders.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None);
let encoded = encoder.write(self)?;
Ok(DataPart::Parquet(encoded))
let parts = encoder.write(self)?;
Ok(parts)
}

/// Reads batches from data buffer without resetting builder's buffers.
pub fn iter(&mut self, pk_weights: &[u16]) -> Result<DataBufferIter> {
// todo(hl): control whether to dedup while invoking `iter`.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
Expand All @@ -155,7 +157,7 @@ impl DataBuffer {
true,
true,
)?;
DataBufferIter::new(batch)
DataBufferReader::new(batch)
}

/// Returns num of rows in data buffer.
Expand Down Expand Up @@ -287,29 +289,29 @@ fn data_buffer_to_record_batches(
}

#[derive(Debug)]
pub(crate) struct DataBufferIter {
pub(crate) struct DataBufferReader {
batch: RecordBatch,
offset: usize,
current_batch: Option<(PkIndex, Range<usize>)>,
}

impl DataBufferIter {
impl DataBufferReader {
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
let mut iter = Self {
let mut reader = Self {
batch,
offset: 0,
current_batch: None,
};
iter.next()?; // fill data batch for comparison and merge.
Ok(iter)
reader.next()?; // fill data batch for comparison and merge.
Ok(reader)
}

pub(crate) fn is_valid(&self) -> bool {
self.current_batch.is_some()
}

/// # Panics
/// If Current iterator is not exhausted.
/// If Current reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let (pk_index, range) = self.current_batch.as_ref().unwrap();
DataBatch {
Expand All @@ -320,13 +322,13 @@ impl DataBufferIter {
}

/// # Panics
/// If Current iterator is exhausted.
/// If Current reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
let (pk_index, _) = self.current_batch.as_ref().unwrap();
*pk_index
}

/// Advances iterator to next data batch.
/// Advances reader to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
self.current_batch = None;
Expand Down Expand Up @@ -506,7 +508,7 @@ impl<'a> DataPartEncoder<'a> {
.build()
})
}
pub fn write(&self, source: &mut DataBuffer) -> Result<Bytes> {
pub fn write(&self, source: &mut DataBuffer) -> Result<DataPart> {
let mut bytes = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props())
.context(error::EncodeMemtableSnafu)?;
Expand All @@ -519,26 +521,138 @@ impl<'a> DataPartEncoder<'a> {
true,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?;
Ok(Bytes::from(bytes))
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
Ok(DataPart::Parquet(ParquetPart {
data: Bytes::from(bytes),
}))
}
}

/// Data parts under a shard.
pub struct DataParts {
/// The active writing buffer.
pub(crate) active: DataBuffer,
/// immutable (encoded) parts.
pub(crate) frozen: Vec<DataPart>,
}

/// Format of immutable data part.
pub enum DataPart {
Parquet(Bytes),
Parquet(ParquetPart),
}

impl DataPart {
fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(data) => data.is_empty(),
DataPart::Parquet(p) => p.data.is_empty(),
}
}

/// Reads frozen data part and yields [DataBatch]es.
pub fn read(&self) -> Result<DataPartReader> {
match self {
DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
}
}
}

/// Data parts under a shard.
pub struct DataParts {}
pub struct DataPartReader {
inner: ParquetRecordBatchReader,
current_range: Range<usize>,
current_pk_index: Option<PkIndex>,
current_batch: Option<RecordBatch>,
}

impl Debug for DataPartReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataPartReader")
.field("current_range", &self.current_range)
.field("current_pk_index", &self.current_pk_index)
.finish()
}
}

impl DataPartReader {
pub fn new(data: Bytes, batch_size: Option<usize>) -> Result<Self> {
let mut builder =
ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?;
if let Some(batch_size) = batch_size {
builder = builder.with_batch_size(batch_size);
}
let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
let mut reader = Self {
inner: parquet_reader,
current_pk_index: None,
current_range: 0..0,
current_batch: None,
};
reader.next()?;
Ok(reader)
}

/// Returns false if current reader is exhausted.
pub(crate) fn is_valid(&self) -> bool {
self.current_pk_index.is_some()
}

/// Returns current pk index.
///
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_pk_index.expect("DataPartReader is exhausted")
}

/// Returns current data batch of reader.
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let rb = self.current_batch.as_ref().unwrap();
let pk_index = self.current_pk_index.unwrap();
let range = self.current_range.clone();
DataBatch {
pk_index,
rb,
range,
}
}

pub(crate) fn next(&mut self) -> Result<()> {
if let Some((next_pk, range)) = self.search_next_pk_range() {
// first try to search next pk in current record batch.
self.current_pk_index = Some(next_pk);
self.current_range = range;
} else {
// current record batch reaches eof, fetch next record batch from parquet reader.
if let Some(rb) = self.inner.next() {
let rb = rb.context(error::ComputeArrowSnafu)?;
self.current_range = 0..0;
self.current_batch = Some(rb);
return self.next();
} else {
// parquet is also exhausted
self.current_pk_index = None;
self.current_batch = None;
}
}

Ok(())
}

/// Searches next primary key along with it's offset range inside record batch.
fn search_next_pk_range(&self) -> Option<(PkIndex, Range<usize>)> {
self.current_batch.as_ref().and_then(|b| {
// safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part.
let pk_array = pk_index_array(b);
search_next_pk_range(pk_array, self.current_range.end)
})
}
}

/// Parquet-encoded `DataPart`.
pub struct ParquetPart {
data: Bytes,
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -778,7 +892,10 @@ mod tests {
assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoded = encoder.write(&mut buffer).unwrap();
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};

let s = String::from_utf8_lossy(encoded.as_bytes());
assert!(s.starts_with("PAR1"));
assert!(s.ends_with("PAR1"));
Expand All @@ -789,10 +906,10 @@ mod tests {
assert_eq!(3, batch.num_rows());
}

fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec<f64>]) {
fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec<f64>]) {
let mut output = Vec::with_capacity(expected_values.len());
while iter.is_valid() {
let batch = iter.current_data_batch().slice_record_batch();
while reader.is_valid() {
let batch = reader.current_data_batch().slice_record_batch();
let values = batch
.column_by_name("v1")
.unwrap()
Expand All @@ -803,7 +920,7 @@ mod tests {
.map(|v| v.unwrap())
.collect::<Vec<_>>();
output.push(values);
iter.next().unwrap();
reader.next().unwrap();
}
assert_eq!(expected_values, output);
}
Expand Down Expand Up @@ -842,15 +959,85 @@ mod tests {
2,
);

let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
}

#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}

fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec<f64>]) {
let mut output = Vec::with_capacity(expected_values.len());
while iter.is_valid() {
let batch = iter.current_data_batch().slice_record_batch();
let values = batch
.column_by_name("v1")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>();
output.push(values);
iter.next().unwrap();
}
assert_eq!(expected_values, output);
}

fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);

write_rows_to_buffer(
&mut buffer,
&meta,
2,
vec![0, 1, 2],
vec![Some(1.0), Some(2.0), Some(3.0)],
2,
);

write_rows_to_buffer(
&mut buffer,
&meta,
3,
vec![1, 2, 3],
vec![Some(1.1), Some(2.1), Some(3.1)],
3,
);

write_rows_to_buffer(
&mut buffer,
&meta,
2,
vec![2, 3],
vec![Some(2.2), Some(2.3)],
4,
);

let encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
check_part_values_equal(&mut iter, expected_values);
}

#[test]
fn test_iter_data_part() {
check_iter_data_part(
&[0, 1, 2, 3],
&[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]],
);

check_iter_data_part(
&[3, 2, 1, 0],
&[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]],
);
}
}

0 comments on commit 90169c8

Please sign in to comment.