diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index bd6df63d0114..b2be074e421d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -294,7 +294,7 @@ pub enum Error { }, #[snafu(display( - "Failed to deserialize field, source: {} location: {}", + "Failed to deserialize field, source: {}, location: {}", source, location ))] @@ -302,6 +302,9 @@ pub enum Error { source: memcomparable::Error, location: Location, }, + + #[snafu(display("Invalid batch, {}, location: {}", reason, location))] + InvalidBatch { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -351,6 +354,7 @@ impl ErrorExt for Error { SerializeField { .. } => StatusCode::Internal, NotSupportedField { .. } => StatusCode::Unsupported, DeserializeField { .. } => StatusCode::Unexpected, + InvalidBatch { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 90b6bf19478c..db4999e41741 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -14,68 +14,182 @@ //! Common structs and utilities for reading data. +use std::sync::Arc; + use async_trait::async_trait; use common_time::Timestamp; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef}; +use snafu::ensure; +use store_api::storage::ColumnId; -use crate::error::Result; +use crate::error::{InvalidBatchSnafu, Result}; use crate::metadata::RegionMetadataRef; -/// Storage internal representation of a batch of rows. +/// Storage internal representation of a batch of rows +/// for a primary key (time series). /// -/// Now the structure of [Batch] is still unstable, all pub fields may be changed. -#[derive(Debug, Default, PartialEq, Eq, Clone)] +/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. +#[derive(Debug, PartialEq, Clone)] pub struct Batch { - /// Rows organized in columnar format. - pub columns: Vec, + /// Primary key encoded in a comparable form. + primary_key: Vec, + /// Timestamps of rows, should be sorted and not null. + timestamps: VectorRef, + /// Sequences of rows + /// + /// UInt64 type, not null. + sequences: Arc, + /// Op types of rows + /// + /// UInt8 type, not null. + op_types: Arc, + /// Fields organized in columnar format. + fields: Vec, } impl Batch { - /// Create a new `Batch` from `columns`. - /// - /// # Panics - /// Panics if vectors in `columns` have different length. - pub fn new(columns: Vec) -> Batch { - Self::assert_columns(&columns); + /// Creates a new batch. + pub fn new( + primary_key: Vec, + timestamps: VectorRef, + sequences: Arc, + op_types: Arc, + fields: Vec, + ) -> Result { + BatchBuilder::new(primary_key, timestamps, sequences, op_types) + .with_fields(fields) + .build() + } + + /// Returns primary key of the batch. + pub fn primary_key(&self) -> &[u8] { + &self.primary_key + } + + /// Returns fields in the batch. + pub fn fields(&self) -> &[BatchColumn] { + &self.fields + } - Batch { columns } + /// Returns timestamps of the batch. + pub fn timestamps(&self) -> &VectorRef { + &self.timestamps } - /// Returns number of columns in the batch. - pub fn num_columns(&self) -> usize { - self.columns.len() + /// Returns sequences of the batch. + pub fn sequences(&self) -> &Arc { + &self.sequences } - /// Returns number of rows in the batch. + /// Returns op types of the batch. + pub fn op_types(&self) -> &Arc { + &self.op_types + } + + /// Returns the number of rows in the batch. pub fn num_rows(&self) -> usize { - self.columns.get(0).map(|v| v.len()).unwrap_or(0) + // All vectors have the same length so we use + // the length of timestamps vector. + self.timestamps.len() } /// Returns true if the number of rows in the batch is 0. pub fn is_empty(&self) -> bool { self.num_rows() == 0 } +} + +/// A column in a [Batch]. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct BatchColumn { + /// Id of the column. + pub column_id: ColumnId, + /// Data of the column. + pub data: VectorRef, +} + +/// Builder to build [Batch]. +pub struct BatchBuilder { + primary_key: Vec, + timestamps: VectorRef, + sequences: Arc, + op_types: Arc, + fields: Vec, +} + +impl BatchBuilder { + /// Creates a new [BatchBuilder]. + pub fn new( + primary_key: Vec, + timestamps: VectorRef, + sequences: Arc, + op_types: Arc, + ) -> BatchBuilder { + BatchBuilder { + primary_key, + timestamps, + sequences, + op_types, + fields: Vec::new(), + } + } - /// Slice the batch, returning a new batch. - /// - /// # Panics - /// Panics if `offset + length > self.num_rows()`. - pub fn slice(&self, offset: usize, length: usize) -> Batch { - let columns = self - .columns - .iter() - .map(|v| v.slice(offset, length)) - .collect(); - Batch { columns } - } - - fn assert_columns(columns: &[VectorRef]) { - if columns.is_empty() { - return; + /// Set all field columns. + pub fn with_fields(mut self, fields: Vec) -> Self { + self.fields = fields; + self + } + + /// Push a field column. + pub fn push_field(&mut self, column: BatchColumn) -> &mut Self { + self.fields.push(column); + self + } + + /// Builds the [Batch]. + pub fn build(self) -> Result { + let ts_len = self.timestamps.len(); + ensure!( + self.sequences.len() == ts_len, + InvalidBatchSnafu { + reason: format!( + "sequence have different len {} != {}", + self.sequences.len(), + ts_len + ), + } + ); + ensure!( + self.op_types.len() == ts_len, + InvalidBatchSnafu { + reason: format!( + "op type have different len {} != {}", + self.op_types.len(), + ts_len + ), + } + ); + for column in &self.fields { + ensure!( + column.data.len() == ts_len, + InvalidBatchSnafu { + reason: format!( + "column {} has different len {} != {}", + column.column_id, + column.data.len(), + ts_len + ), + } + ); } - let length = columns[0].len(); - assert!(columns.iter().all(|col| col.len() == length)); + Ok(Batch { + primary_key: self.primary_key, + timestamps: self.timestamps, + sequences: self.sequences, + op_types: self.op_types, + fields: self.fields, + }) } } @@ -110,6 +224,7 @@ impl Source { unimplemented!() } + // TODO(yingwen): Maybe remove this method. /// Returns statisics of fetched batches. pub(crate) fn stats(&self) -> SourceStats { unimplemented!() diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 4b50a575342f..b69196881439 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,12 +15,14 @@ //! Parquet writer. use common_telemetry::debug; +use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; +use snafu::ResultExt; -use crate::error::Result; +use crate::error::{NewRecordBatchSnafu, Result}; use crate::read::Source; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::stream_writer::BufferedWriter; @@ -64,17 +66,28 @@ impl<'a> ParquetWriter<'a> { let writer_props = props_builder.build(); + let arrow_schema = metadata.schema.arrow_schema(); let mut buffered_writer = BufferedWriter::try_new( self.file_path.to_string(), self.object_store.clone(), - &metadata.schema, + arrow_schema.clone(), Some(writer_props), opts.write_buffer_size.as_bytes() as usize, ) .await?; while let Some(batch) = self.source.next_batch().await? { - buffered_writer.write(&batch).await?; + let arrow_batch = RecordBatch::try_new( + arrow_schema.clone(), + batch + .fields() + .iter() + .map(|v| v.data.to_arrow_array()) + .collect::>(), + ) + .context(NewRecordBatchSnafu)?; + + buffered_writer.write(&arrow_batch).await?; } // Get stats from the source. let stats = self.source.stats(); diff --git a/src/mito2/src/sst/stream_writer.rs b/src/mito2/src/sst/stream_writer.rs index 62fa4df7e6e1..005b533443b6 100644 --- a/src/mito2/src/sst/stream_writer.rs +++ b/src/mito2/src/sst/stream_writer.rs @@ -17,9 +17,8 @@ use std::pin::Pin; use common_datasource::buffered_writer::LazyBufferedWriter; use common_datasource::share_buffer::SharedBuffer; -use datatypes::arrow; +use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::schema::SchemaRef; use object_store::ObjectStore; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; @@ -27,14 +26,13 @@ use parquet::format::FileMetaData; use snafu::ResultExt; use crate::error; -use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu}; -use crate::read::Batch; +use crate::error::WriteParquetSnafu; /// Parquet writer that buffers row groups in memory and writes buffered data to an underlying /// storage by chunks to reduce memory consumption. pub struct BufferedWriter { inner: InnerBufferedWriter, - arrow_schema: arrow::datatypes::SchemaRef, + arrow_schema: SchemaRef, } type InnerBufferedWriter = LazyBufferedWriter< @@ -56,11 +54,10 @@ impl BufferedWriter { pub async fn try_new( path: String, store: ObjectStore, - schema: &SchemaRef, + arrow_schema: SchemaRef, props: Option, buffer_threshold: usize, ) -> error::Result { - let arrow_schema = schema.arrow_schema(); let buffer = SharedBuffer::with_capacity(buffer_threshold); let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props) @@ -82,24 +79,14 @@ impl BufferedWriter { }) }), ), - arrow_schema: arrow_schema.clone(), + arrow_schema, }) } /// Write a record batch to stream writer. - pub async fn write(&mut self, batch: &Batch) -> error::Result<()> { - let arrow_batch = RecordBatch::try_new( - self.arrow_schema.clone(), - batch - .columns - .iter() - .map(|v| v.to_arrow_array()) - .collect::>(), - ) - .context(NewRecordBatchSnafu)?; - + pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> { self.inner - .write(&arrow_batch) + .write(arrow_batch) .await .context(error::WriteBufferSnafu)?; self.inner