diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f141857a5322..3885e3ae8506 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -566,6 +566,12 @@ pub enum Error { error: parquet::errors::ParquetError, location: Location, }, + + #[snafu(display("Failed to iter data part"))] + ReadDataPart { + #[snafu(source)] + error: parquet::errors::ParquetError, + }, } pub type Result = std::result::Result; @@ -669,7 +675,7 @@ impl ErrorExt for Error { FilterRecordBatch { source, .. } => source.status_code(), Upload { .. } => StatusCode::StorageUnavailable, BiError { .. } => StatusCode::Internal, - EncodeMemtable { .. } => StatusCode::Internal, + EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index e4ed65f8f601..20224b8af23c 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -15,6 +15,7 @@ //! Data part of a shard. use std::cmp::{Ordering, Reverse}; +use std::fmt::{Debug, Formatter}; use std::ops::Range; use std::sync::Arc; @@ -31,6 +32,7 @@ use datatypes::vectors::{ TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8VectorBuilder, }; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use snafu::ResultExt; @@ -140,13 +142,13 @@ impl DataBuffer { /// `freeze` clears the buffers of builders. pub fn freeze(&mut self, pk_weights: &[u16]) -> Result { let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None); - let encoded = encoder.write(self)?; - Ok(DataPart::Parquet(encoded)) + let parts = encoder.write(self)?; + Ok(parts) } /// Reads batches from data buffer without resetting builder's buffers. - pub fn iter(&mut self, pk_weights: &[u16]) -> Result { - // todo(hl): control whether to dedup while invoking `iter`. + pub fn read(&mut self, pk_weights: &[u16]) -> Result { + // todo(hl): control whether to dedup while invoking `read`. let batch = data_buffer_to_record_batches( self.data_part_schema.clone(), self, @@ -155,7 +157,7 @@ impl DataBuffer { true, true, )?; - DataBufferIter::new(batch) + DataBufferReader::new(batch) } /// Returns num of rows in data buffer. @@ -287,21 +289,21 @@ fn data_buffer_to_record_batches( } #[derive(Debug)] -pub(crate) struct DataBufferIter { +pub(crate) struct DataBufferReader { batch: RecordBatch, offset: usize, current_batch: Option<(PkIndex, Range)>, } -impl DataBufferIter { +impl DataBufferReader { pub(crate) fn new(batch: RecordBatch) -> Result { - let mut iter = Self { + let mut reader = Self { batch, offset: 0, current_batch: None, }; - iter.next()?; // fill data batch for comparison and merge. - Ok(iter) + reader.next()?; // fill data batch for comparison and merge. + Ok(reader) } pub(crate) fn is_valid(&self) -> bool { @@ -309,7 +311,7 @@ impl DataBufferIter { } /// # Panics - /// If Current iterator is not exhausted. + /// If Current reader is exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { let (pk_index, range) = self.current_batch.as_ref().unwrap(); DataBatch { @@ -320,13 +322,13 @@ impl DataBufferIter { } /// # Panics - /// If Current iterator is exhausted. + /// If Current reader is exhausted. pub(crate) fn current_pk_index(&self) -> PkIndex { let (pk_index, _) = self.current_batch.as_ref().unwrap(); *pk_index } - /// Advances iterator to next data batch. + /// Advances reader to next data batch. pub(crate) fn next(&mut self) -> Result<()> { if self.offset >= self.batch.num_rows() { self.current_batch = None; @@ -506,7 +508,7 @@ impl<'a> DataPartEncoder<'a> { .build() }) } - pub fn write(&self, source: &mut DataBuffer) -> Result { + pub fn write(&self, source: &mut DataBuffer) -> Result { let mut bytes = Vec::with_capacity(1024); let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props()) .context(error::EncodeMemtableSnafu)?; @@ -519,26 +521,138 @@ impl<'a> DataPartEncoder<'a> { true, )?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; - let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?; - Ok(Bytes::from(bytes)) + let _metadata = writer.close().context(error::EncodeMemtableSnafu)?; + Ok(DataPart::Parquet(ParquetPart { + data: Bytes::from(bytes), + })) } } +/// Data parts under a shard. +pub struct DataParts { + /// The active writing buffer. + pub(crate) active: DataBuffer, + /// immutable (encoded) parts. + pub(crate) frozen: Vec, +} + /// Format of immutable data part. pub enum DataPart { - Parquet(Bytes), + Parquet(ParquetPart), } impl DataPart { fn is_empty(&self) -> bool { match self { - DataPart::Parquet(data) => data.is_empty(), + DataPart::Parquet(p) => p.data.is_empty(), + } + } + + /// Reads frozen data part and yields [DataBatch]es. + pub fn read(&self) -> Result { + match self { + DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None), } } } -/// Data parts under a shard. -pub struct DataParts {} +pub struct DataPartReader { + inner: ParquetRecordBatchReader, + current_range: Range, + current_pk_index: Option, + current_batch: Option, +} + +impl Debug for DataPartReader { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DataPartReader") + .field("current_range", &self.current_range) + .field("current_pk_index", &self.current_pk_index) + .finish() + } +} + +impl DataPartReader { + pub fn new(data: Bytes, batch_size: Option) -> Result { + let mut builder = + ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?; + if let Some(batch_size) = batch_size { + builder = builder.with_batch_size(batch_size); + } + let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?; + let mut reader = Self { + inner: parquet_reader, + current_pk_index: None, + current_range: 0..0, + current_batch: None, + }; + reader.next()?; + Ok(reader) + } + + /// Returns false if current reader is exhausted. + pub(crate) fn is_valid(&self) -> bool { + self.current_pk_index.is_some() + } + + /// Returns current pk index. + /// + /// # Panics + /// If reader is exhausted. + pub(crate) fn current_pk_index(&self) -> PkIndex { + self.current_pk_index.expect("DataPartReader is exhausted") + } + + /// Returns current data batch of reader. + /// # Panics + /// If reader is exhausted. + pub(crate) fn current_data_batch(&self) -> DataBatch { + let rb = self.current_batch.as_ref().unwrap(); + let pk_index = self.current_pk_index.unwrap(); + let range = self.current_range.clone(); + DataBatch { + pk_index, + rb, + range, + } + } + + pub(crate) fn next(&mut self) -> Result<()> { + if let Some((next_pk, range)) = self.search_next_pk_range() { + // first try to search next pk in current record batch. + self.current_pk_index = Some(next_pk); + self.current_range = range; + } else { + // current record batch reaches eof, fetch next record batch from parquet reader. + if let Some(rb) = self.inner.next() { + let rb = rb.context(error::ComputeArrowSnafu)?; + self.current_range = 0..0; + self.current_batch = Some(rb); + return self.next(); + } else { + // parquet is also exhausted + self.current_pk_index = None; + self.current_batch = None; + } + } + + Ok(()) + } + + /// Searches next primary key along with it's offset range inside record batch. + fn search_next_pk_range(&self) -> Option<(PkIndex, Range)> { + self.current_batch.as_ref().and_then(|b| { + // safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part. + let pk_array = pk_index_array(b); + search_next_pk_range(pk_array, self.current_range.end) + }) + } +} + +/// Parquet-encoded `DataPart`. +pub struct ParquetPart { + data: Bytes, +} #[cfg(test)] mod tests { @@ -778,7 +892,10 @@ mod tests { assert_eq!(4, buffer.num_rows()); let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); - let encoded = encoder.write(&mut buffer).unwrap(); + let encoded = match encoder.write(&mut buffer).unwrap() { + DataPart::Parquet(data) => data.data, + }; + let s = String::from_utf8_lossy(encoded.as_bytes()); assert!(s.starts_with("PAR1")); assert!(s.ends_with("PAR1")); @@ -789,10 +906,10 @@ mod tests { assert_eq!(3, batch.num_rows()); } - fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec]) { + fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec]) { let mut output = Vec::with_capacity(expected_values.len()); - while iter.is_valid() { - let batch = iter.current_data_batch().slice_record_batch(); + while reader.is_valid() { + let batch = reader.current_data_batch().slice_record_batch(); let values = batch .column_by_name("v1") .unwrap() @@ -803,7 +920,7 @@ mod tests { .map(|v| v.unwrap()) .collect::>(); output.push(values); - iter.next().unwrap(); + reader.next().unwrap(); } assert_eq!(expected_values, output); } @@ -842,7 +959,7 @@ mod tests { 2, ); - let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap(); + let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap(); check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]); } @@ -850,7 +967,77 @@ mod tests { fn test_iter_empty_data_buffer() { let meta = metadata_for_test(); let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); - let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap(); + let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap(); check_buffer_values_equal(&mut iter, &[]); } + + fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec]) { + let mut output = Vec::with_capacity(expected_values.len()); + while iter.is_valid() { + let batch = iter.current_data_batch().slice_record_batch(); + let values = batch + .column_by_name("v1") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>(); + output.push(values); + iter.next().unwrap(); + } + assert_eq!(expected_values, output); + } + + fn check_iter_data_part(weights: &[u16], expected_values: &[Vec]) { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + + write_rows_to_buffer( + &mut buffer, + &meta, + 2, + vec![0, 1, 2], + vec![Some(1.0), Some(2.0), Some(3.0)], + 2, + ); + + write_rows_to_buffer( + &mut buffer, + &meta, + 3, + vec![1, 2, 3], + vec![Some(1.1), Some(2.1), Some(3.1)], + 3, + ); + + write_rows_to_buffer( + &mut buffer, + &meta, + 2, + vec![2, 3], + vec![Some(2.2), Some(2.3)], + 4, + ); + + let encoder = DataPartEncoder::new(&meta, weights, Some(4)); + let encoded = encoder.write(&mut buffer).unwrap(); + + let mut iter = encoded.read().unwrap(); + check_part_values_equal(&mut iter, expected_values); + } + + #[test] + fn test_iter_data_part() { + check_iter_data_part( + &[0, 1, 2, 3], + &[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]], + ); + + check_iter_data_part( + &[3, 2, 1, 0], + &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]], + ); + } }