diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2ea4aca0b5f9..169a77c8ad95 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -309,8 +309,28 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))] + InvalidParquet { + file: String, + reason: String, + location: Location, + }, + #[snafu(display("Invalid batch, {}, location: {}", reason, location))] InvalidBatch { reason: String, location: Location }, + + #[snafu(display("Invalid arrow record batch, {}, location: {}", reason, location))] + InvalidRecordBatch { reason: String, location: Location }, + + #[snafu(display( + "Failed to convert array to vector, location: {}, source: {}", + location, + source + ))] + ConvertVector { + location: Location, + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -341,7 +361,8 @@ impl ErrorExt for Error { | NewRecordBatch { .. } | RegionNotFound { .. } | RegionCorrupted { .. } - | CreateDefault { .. } => StatusCode::Unexpected, + | CreateDefault { .. } + | InvalidParquet { .. } => StatusCode::Unexpected, InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } @@ -362,6 +383,8 @@ impl ErrorExt for Error { NotSupportedField { .. } => StatusCode::Unsupported, DeserializeField { .. } => StatusCode::Unexpected, InvalidBatch { .. } => StatusCode::InvalidArguments, + InvalidRecordBatch { .. } => StatusCode::InvalidArguments, + ConvertVector { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 53335efe6701..f9f919581ac6 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -18,12 +18,15 @@ use std::sync::Arc; use async_trait::async_trait; use common_time::Timestamp; -use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef}; -use snafu::ensure; +use datatypes::arrow; +use datatypes::arrow::array::ArrayRef; +use datatypes::prelude::DataType; +use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector, Vector, VectorRef}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; -use crate::error::{InvalidBatchSnafu, Result}; +use crate::error::{ConvertVectorSnafu, InvalidBatchSnafu, Result}; /// Storage internal representation of a batch of rows /// for a primary key (time series). @@ -56,7 +59,7 @@ impl Batch { op_types: Arc, fields: Vec, ) -> Result { - BatchBuilder::new(primary_key, timestamps, sequences, op_types) + BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types) .with_fields(fields) .build() } @@ -111,15 +114,26 @@ pub struct BatchColumn { /// Builder to build [Batch]. pub struct BatchBuilder { primary_key: Vec, - timestamps: VectorRef, - sequences: Arc, - op_types: Arc, + timestamps: Option, + sequences: Option>, + op_types: Option>, fields: Vec, } impl BatchBuilder { - /// Creates a new [BatchBuilder]. - pub fn new( + /// Creates a new [BatchBuilder] with primary key. + pub fn new(primary_key: Vec) -> BatchBuilder { + BatchBuilder { + primary_key, + timestamps: None, + sequences: None, + op_types: None, + fields: Vec::new(), + } + } + + /// Creates a new [BatchBuilder] with all required columns. + pub fn with_required_columns( primary_key: Vec, timestamps: VectorRef, sequences: Arc, @@ -127,9 +141,9 @@ impl BatchBuilder { ) -> BatchBuilder { BatchBuilder { primary_key, - timestamps, - sequences, - op_types, + timestamps: Some(timestamps), + sequences: Some(sequences), + op_types: Some(op_types), fields: Vec::new(), } } @@ -146,25 +160,90 @@ impl BatchBuilder { self } + /// Push an array as a field. + pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> { + let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?; + self.fields.push(BatchColumn { + column_id, + data: vector, + }); + + Ok(self) + } + + /// Try to set an array as timestamps. + pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> { + let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?; + ensure!( + vector.data_type().is_timestamp_compatible(), + InvalidBatchSnafu { + reason: format!("{:?} is a timestamp type", vector.data_type()), + } + ); + + self.timestamps = Some(vector); + Ok(self) + } + + /// Try to set an array as sequences. + pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> { + ensure!( + *array.data_type() == arrow::datatypes::DataType::UInt64, + InvalidBatchSnafu { + reason: "sequence array is not UInt64 type", + } + ); + // Safety: The cast must success as we have ensured it is uint64 type. + let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap()); + self.sequences = Some(vector); + + Ok(self) + } + + /// Try to set an array as op types. + pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> { + ensure!( + *array.data_type() == arrow::datatypes::DataType::UInt8, + InvalidBatchSnafu { + reason: "sequence array is not UInt8 type", + } + ); + // Safety: The cast must success as we have ensured it is uint64 type. + let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap()); + self.op_types = Some(vector); + + Ok(self) + } + /// Builds the [Batch]. pub fn build(self) -> Result { - let ts_len = self.timestamps.len(); + let timestamps = self.timestamps.context(InvalidBatchSnafu { + reason: "missing timestamps", + })?; + let sequences = self.sequences.context(InvalidBatchSnafu { + reason: "missing sequences", + })?; + let op_types = self.op_types.context(InvalidBatchSnafu { + reason: "missing op_types", + })?; + + let ts_len = timestamps.len(); ensure!( - self.sequences.len() == ts_len, + sequences.len() == ts_len, InvalidBatchSnafu { reason: format!( "sequence have different len {} != {}", - self.sequences.len(), + sequences.len(), ts_len ), } ); ensure!( - self.op_types.len() == ts_len, + op_types.len() == ts_len, InvalidBatchSnafu { reason: format!( "op type have different len {} != {}", - self.op_types.len(), + op_types.len(), ts_len ), } @@ -185,9 +264,9 @@ impl BatchBuilder { Ok(Batch { primary_key: self.primary_key, - timestamps: self.timestamps, - sequences: self.sequences, - op_types: self.op_types, + timestamps, + sequences, + op_types, fields: self.fields, }) } @@ -232,8 +311,12 @@ impl Source { } /// Async batch reader. +/// +/// The reader must guarantee [Batch]es returned by it have the same schema. #[async_trait] pub trait BatchReader: Send { + // TODO(yingwen): fields of the batch returned. + /// Fetch next [Batch]. /// /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()` diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index c7de27199adf..eb1d5068aa22 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -14,6 +14,7 @@ //! SST in parquet format. +mod format; mod reader; mod writer; diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs new file mode 100644 index 000000000000..fe665ac8d017 --- /dev/null +++ b/src/mito2/src/sst/parquet/format.rs @@ -0,0 +1,581 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Format to store in parquet. +//! +//! We store three internal columns in parquet: +//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary) +//! - `__sequence`, the sequence number of a row. Type: uint64 +//! - `__op_type`, the op type of the row. Type: uint8 +//! +//! The schema of a parquet file is: +//! ```text +//! field 0, field 1, ..., field N, time index, primary key, sequence, op type +//! ``` + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array}; +use datatypes::arrow::datatypes::{ + DataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type, +}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::vectors::{Helper, Vector}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::consts::{ + OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, +}; +use store_api::storage::ColumnId; + +use crate::error::{ + ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, +}; +use crate::read::{Batch, BatchBuilder, BatchColumn}; + +/// Number of columns that have fixed positions. +/// +/// Contains: time index and internal columns. +const FIXED_POS_COLUMN_NUM: usize = 4; + +/// Helper for writing the SST format. +pub(crate) struct WriteFormat { + metadata: RegionMetadataRef, + /// SST file schema. + arrow_schema: SchemaRef, +} + +impl WriteFormat { + /// Creates a new helper. + pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat { + let arrow_schema = to_sst_arrow_schema(&metadata); + WriteFormat { + metadata, + arrow_schema, + } + } + + /// Gets the arrow schema to store in parquet. + pub(crate) fn arrow_schema(&self) -> SchemaRef { + self.arrow_schema.clone() + } + + /// Convert `batch` to a arrow record batch to store in parquet. + pub(crate) fn convert_batch(&self, batch: &Batch) -> Result { + debug_assert_eq!( + batch.fields().len() + FIXED_POS_COLUMN_NUM, + self.arrow_schema.fields().len() + ); + let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM); + // Store all fields first. + for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) { + ensure!( + column.column_id == column_metadata.column_id, + InvalidBatchSnafu { + reason: format!( + "Batch has column {} but metadata has column {}", + column.column_id, column_metadata.column_id + ), + } + ); + + columns.push(column.data.to_arrow_array()); + } + // Add time index column. + columns.push(batch.timestamps().to_arrow_array()); + // Add internal columns: primary key, sequences, op types. + columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows())); + columns.push(batch.sequences().to_arrow_array()); + columns.push(batch.op_types().to_arrow_array()); + + RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) + } +} + +/// Helper for reading the SST format. +pub(crate) struct ReadFormat { + metadata: RegionMetadataRef, + /// SST file schema. + arrow_schema: SchemaRef, + // Field column id to its index in `schema` (SST schema). + field_id_to_index: HashMap, +} + +impl ReadFormat { + /// Creates a helper with existing `metadata`. + pub(crate) fn new(metadata: RegionMetadataRef) -> ReadFormat { + let field_id_to_index: HashMap<_, _> = metadata + .field_columns() + .enumerate() + .map(|(index, column)| (column.column_id, index)) + .collect(); + let arrow_schema = to_sst_arrow_schema(&metadata); + + ReadFormat { + metadata, + arrow_schema, + field_id_to_index, + } + } + + /// Gets the converted arrow schema. + pub(crate) fn arrow_schema(&self) -> &SchemaRef { + &self.arrow_schema + } + + /// Gets sorted projection indices to read `columns` from parquet files. + /// + /// This function ignores columns not in `metadata` to for compatibility between + /// different schemas. + pub(crate) fn projection_indices( + &self, + columns: impl IntoIterator, + ) -> Vec { + let mut indices: Vec<_> = columns + .into_iter() + .filter_map(|column_id| { + // Only apply projection to fields. + self.field_id_to_index.get(&column_id).copied() + }) + // We need to add all fixed position columns. + .chain( + self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM + ..self.arrow_schema.fields.len(), + ) + .collect(); + indices.sort_unstable(); + indices + } + + /// Convert a arrow record batch into `batches`. + /// + /// Note that the `record_batch` may only contains a subset of columns if it is projected. + pub(crate) fn convert_record_batch( + &self, + record_batch: &RecordBatch, + batches: &mut Vec, + ) -> Result<()> { + debug_assert!(batches.is_empty()); + + // The record batch must has time index and internal columns. + ensure!( + record_batch.num_columns() >= FIXED_POS_COLUMN_NUM, + InvalidRecordBatchSnafu { + reason: format!( + "record batch only has {} columns", + record_batch.num_columns() + ), + } + ); + + let mut fixed_pos_columns = record_batch + .columns() + .iter() + .rev() + .take(FIXED_POS_COLUMN_NUM); + // Safety: We have checked the column number. + let op_type_array = fixed_pos_columns.next().unwrap(); + let sequence_array = fixed_pos_columns.next().unwrap(); + let pk_array = fixed_pos_columns.next().unwrap(); + let ts_array = fixed_pos_columns.next().unwrap(); + let field_batch_columns = self.get_field_batch_columns(record_batch)?; + + // Compute primary key offsets. + let pk_dict_array = pk_array + .as_any() + .downcast_ref::>() + .with_context(|| InvalidRecordBatchSnafu { + reason: format!("primary key array should not be {:?}", pk_array.data_type()), + })?; + let offsets = primary_key_offsets(pk_dict_array)?; + if offsets.is_empty() { + return Ok(()); + } + + // Split record batch according to pk offsets. + let keys = pk_dict_array.keys(); + let pk_values = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .with_context(|| InvalidRecordBatchSnafu { + reason: format!( + "values of primary key array should not be {:?}", + pk_dict_array.values().data_type() + ), + })?; + for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() { + let end = offsets[i + 1]; + let rows_in_batch = end - start; + let dict_key = keys.value(*start); + let primary_key = pk_values.value(dict_key.into()).to_vec(); + + let mut builder = BatchBuilder::new(primary_key); + builder + .timestamps_array(ts_array.slice(*start, rows_in_batch))? + .sequences_array(sequence_array.slice(*start, rows_in_batch))? + .op_types_array(op_type_array.slice(*start, rows_in_batch))?; + // Push all fields + for batch_column in &field_batch_columns { + builder.push_field(BatchColumn { + column_id: batch_column.column_id, + data: batch_column.data.slice(*start, rows_in_batch), + }); + } + + let batch = builder.build()?; + batches.push(batch); + } + + Ok(()) + } + + /// Get fields from `record_batch`. + fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result> { + record_batch + .columns() + .iter() + .zip(record_batch.schema().fields()) + .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns. + .map(|(array, field)| { + let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?; + let column = self + .metadata + .column_by_name(field.name()) + .with_context(|| InvalidRecordBatchSnafu { + reason: format!("column {} not found in metadata", field.name()), + })?; + + Ok(BatchColumn { + column_id: column.column_id, + data: vector, + }) + }) + .collect() + } +} + +/// Gets the arrow schema to store in parquet. +fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { + let fields = Fields::from_iter( + metadata + .schema + .arrow_schema() + .fields() + .iter() + .zip(&metadata.column_metadatas) + .filter_map(|(field, column_meta)| { + if column_meta.semantic_type == SemanticType::Field { + Some(field.clone()) + } else { + // We have fixed positions for tags (primary key) and time index. + None + } + }) + .chain([metadata.time_index_field()]) + .chain(internal_fields()), + ); + + Arc::new(Schema::new(fields)) +} + +/// Compute offsets of different primary keys in the array. +fn primary_key_offsets(pk_dict_array: &DictionaryArray) -> Result> { + if pk_dict_array.is_empty() { + return Ok(Vec::new()); + } + + // Init offsets. + let mut offsets = vec![0]; + let keys = pk_dict_array.keys(); + // We know that primary keys are always not null so we iterate `keys.values()` directly. + let pk_indices = keys.values(); + for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() { + // Compare each key with next key + if *key != pk_indices[i + 1] { + // We meet a new key, push the next index as end of the offset. + offsets.push(i + 1); + } + } + offsets.push(keys.len()); + + Ok(offsets) +} + +/// Fields for internal columns. +fn internal_fields() -> [FieldRef; 3] { + // Internal columns are always not null. + [ + Arc::new(Field::new_dictionary( + PRIMARY_KEY_COLUMN_NAME, + DataType::UInt16, + DataType::Binary, + false, + )), + Arc::new(Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false)), + Arc::new(Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false)), + ] +} + +/// Creates a new array for specific `primary_key`. +fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef { + let values = Arc::new(BinaryArray::from_iter_values([primary_key])); + let keys = UInt16Array::from_value(0, num_rows); + + // Safety: The key index is valid. + Arc::new(DictionaryArray::new(keys, values)) +} + +#[cfg(test)] +mod tests { + use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; + use datatypes::arrow::datatypes::TimeUnit; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + const TEST_SEQUENCE: u64 = 1; + const TEST_OP_TYPE: u8 = 1; + + fn build_test_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field1", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 4, // We change the order of fields columns. + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 5, + }) + .primary_key(vec![1, 3]); + Arc::new(builder.build().unwrap()) + } + + fn build_test_arrow_schema() -> SchemaRef { + let fields = vec![ + Field::new("field1", DataType::Int64, true), + Field::new("field0", DataType::Int64, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)), + false, + ), + Field::new("__sequence", DataType::UInt64, false), + Field::new("__op_type", DataType::UInt8, false), + ]; + Arc::new(Schema::new(fields)) + } + + fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch { + let ts_values = (0..num_rows).map(|i| start_ts + i as i64); + let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values)); + let sequences = Arc::new(UInt64Vector::from_vec(vec![TEST_SEQUENCE; num_rows])); + let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows])); + let fields = vec![ + BatchColumn { + column_id: 4, + data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])), + }, // field1 + BatchColumn { + column_id: 2, + data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])), + }, // field0 + ]; + + BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types) + .with_fields(fields) + .build() + .unwrap() + } + + #[test] + fn test_to_sst_arrow_schema() { + let metadata = build_test_region_metadata(); + let write_format = WriteFormat::new(metadata); + assert_eq!(build_test_arrow_schema(), write_format.arrow_schema()); + } + + #[test] + fn test_new_primary_key_array() { + let array = new_primary_key_array(b"test", 3); + let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef; + assert_eq!(&expect, &array); + } + + fn build_test_pk_array(pk_row_nums: &[(Vec, usize)]) -> Arc> { + let values = Arc::new(BinaryArray::from_iter_values( + pk_row_nums.iter().map(|v| &v.0), + )); + let mut keys = vec![]; + for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() { + keys.extend(std::iter::repeat(index as u16).take(num_rows)); + } + let keys = UInt16Array::from(keys); + Arc::new(DictionaryArray::new(keys, values)) + } + + #[test] + fn test_convert_batch() { + let metadata = build_test_region_metadata(); + let write_format = WriteFormat::new(metadata); + + let num_rows = 4; + let batch = new_batch(b"test", 1, 2, num_rows); + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key + Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap(); + + let actual = write_format.convert_batch(&batch).unwrap(); + assert_eq!(expect_record, actual); + } + + #[test] + fn test_projection_indices() { + let metadata = build_test_region_metadata(); + let read_format = ReadFormat::new(metadata); + // Only read tag1 + assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([3])); + // Only read field1 + assert_eq!(vec![0, 2, 3, 4, 5], read_format.projection_indices([4])); + // Only read ts + assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([5])); + // Read field0, tag0, ts + assert_eq!( + vec![1, 2, 3, 4, 5], + read_format.projection_indices([2, 1, 5]) + ); + } + + #[test] + fn test_empty_primary_key_offsets() { + let array = build_test_pk_array(&[]); + assert!(primary_key_offsets(&array).unwrap().is_empty()); + } + + #[test] + fn test_primary_key_offsets_one_series() { + let array = build_test_pk_array(&[(b"one".to_vec(), 1)]); + assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap()); + + let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]); + assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap()); + + let array = build_test_pk_array(&[ + (b"one".to_vec(), 1), + (b"two".to_vec(), 1), + (b"three".to_vec(), 1), + ]); + assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap()); + } + + #[test] + fn test_primary_key_offsets_multi_series() { + let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]); + assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap()); + + let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]); + assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap()); + + let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]); + assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap()); + } + + #[test] + fn test_convert_empty_record_batch() { + let metadata = build_test_region_metadata(); + let arrow_schema = build_test_arrow_schema(); + let read_format = ReadFormat::new(metadata); + assert_eq!(arrow_schema, *read_format.arrow_schema()); + + let record_batch = RecordBatch::new_empty(arrow_schema); + let mut batches = vec![]; + read_format + .convert_record_batch(&record_batch, &mut batches) + .unwrap(); + assert!(batches.is_empty()); + } + + #[test] + fn test_convert_record_batch() { + let metadata = build_test_region_metadata(); + let read_format = ReadFormat::new(metadata); + + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 + Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts + build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key + Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type + ]; + let arrow_schema = build_test_arrow_schema(); + let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap(); + let mut batches = vec![]; + read_format + .convert_record_batch(&record_batch, &mut batches) + .unwrap(); + + assert_eq!( + vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)], + batches + ); + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index c750f2516272..37f9c0168d9e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,6 +14,8 @@ //! Parquet reader. +use std::sync::Arc; + use async_compat::CompatExt; use async_trait::async_trait; use common_time::range::TimestampRange; @@ -21,15 +23,22 @@ use datatypes::arrow::record_batch::RecordBatch; use futures::stream::BoxStream; use futures::TryStreamExt; use object_store::ObjectStore; -use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::errors::ParquetError; -use snafu::ResultExt; +use parquet::format::KeyValue; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; +use store_api::storage::ColumnId; use table::predicate::Predicate; use tokio::io::BufReader; -use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result}; +use crate::error::{ + InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, +}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; +use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::PARQUET_METADATA_KEY; /// Parquet SST reader builder. pub struct ParquetReaderBuilder { @@ -38,6 +47,7 @@ pub struct ParquetReaderBuilder { object_store: ObjectStore, predicate: Option, time_range: Option, + projection: Option>, } impl ParquetReaderBuilder { @@ -53,6 +63,7 @@ impl ParquetReaderBuilder { object_store, predicate: None, time_range: None, + projection: None, } } @@ -68,65 +79,56 @@ impl ParquetReaderBuilder { self } - /// Builds a [ParquetReader]. - pub fn build(self) -> ParquetReader { + /// Attaches the projection to the builder. + /// + /// The reader only applies the projection to fields. + pub fn projection(mut self, projection: Vec) -> ParquetReaderBuilder { + self.projection = Some(projection); + self + } + + /// Builds and initializes a [ParquetReader]. + /// + /// This needs to perform IO operation. + pub async fn build(self) -> Result { let file_path = self.file_handle.file_path(&self.file_dir); - ParquetReader { + let (stream, read_format) = self.init_stream(&file_path).await?; + + Ok(ParquetReader { file_path, file_handle: self.file_handle, object_store: self.object_store, predicate: self.predicate, time_range: self.time_range, - stream: None, - } + projection: self.projection, + stream, + read_format, + batches: Vec::new(), + }) } -} - -type BoxedRecordBatchStream = BoxStream<'static, std::result::Result>; - -/// Parquet batch reader. -pub struct ParquetReader { - /// Path of the file. - file_path: String, - /// SST file to read. - /// - /// Holds the file handle to avoid the file purge purge it. - file_handle: FileHandle, - object_store: ObjectStore, - /// Predicate to push down. - predicate: Option, - /// Time range to filter. - time_range: Option, - - /// Inner parquet record batch stream. - stream: Option, -} - -impl ParquetReader { - /// Initializes the reader and the parquet stream. - async fn maybe_init(&mut self) -> Result<()> { - if self.stream.is_some() { - // Already initialized. - return Ok(()); - } + /// Initializes the parquet stream, also creates a [ReadFormat] to decode record batches. + async fn init_stream(&self, file_path: &str) -> Result<(BoxedRecordBatchStream, ReadFormat)> { + // Creates parquet stream builder. let reader = self .object_store - .reader(&self.file_path) + .reader(file_path) .await .context(OpenDalSnafu)? .compat(); let buf_reader = BufReader::new(reader); let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader) .await - .context(ReadParquetSnafu { - path: &self.file_path, - })?; + .context(ReadParquetSnafu { path: file_path })?; - // TODO(yingwen): Decode region metadata, create read adapter. + // Decode region metadata. + let key_value_meta = builder.metadata().file_metadata().key_value_metadata(); + let region_meta = self.get_region_metadata(file_path, key_value_meta)?; // Prune row groups by metadata. if let Some(predicate) = &self.predicate { + // TODO(yingwen): Now we encode tags into the full primary key so we need some approach + // to implement pruning. let pruned_row_groups = predicate .prune_row_groups(builder.metadata().row_groups()) .into_iter() @@ -136,36 +138,115 @@ impl ParquetReader { builder = builder.with_row_groups(pruned_row_groups); } - // TODO(yingwen): Projection. + let read_format = ReadFormat::new(Arc::new(region_meta)); + // The arrow schema converted from the region meta should be the same as parquet's. + // We only compare fields to avoid schema's metadata breaks the comparison. + ensure!( + read_format.arrow_schema().fields() == builder.schema().fields(), + InvalidParquetSnafu { + file: file_path, + reason: format!( + "schema mismatch, expect: {:?}, given: {:?}", + read_format.arrow_schema().fields(), + builder.schema().fields() + ) + } + ); + + let parquet_schema_desc = builder.metadata().file_metadata().schema_descr(); + if let Some(column_ids) = self.projection.as_ref() { + let indices = read_format.projection_indices(column_ids.iter().copied()); + let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices); + builder = builder.with_projection(projection_mask); + } + + let stream = builder + .build() + .context(ReadParquetSnafu { path: file_path })?; - let stream = builder.build().context(ReadParquetSnafu { - path: &self.file_path, + Ok((Box::pin(stream), read_format)) + } + + /// Decode region metadata from key value. + fn get_region_metadata( + &self, + file_path: &str, + key_value_meta: Option<&Vec>, + ) -> Result { + let key_values = key_value_meta.context(InvalidParquetSnafu { + file: file_path, + reason: "missing key value meta", })?; - self.stream = Some(Box::pin(stream)); + let meta_value = key_values + .iter() + .find(|kv| kv.key == PARQUET_METADATA_KEY) + .with_context(|| InvalidParquetSnafu { + file: file_path, + reason: format!("key {} not found", PARQUET_METADATA_KEY), + })?; + let json = meta_value + .value + .as_ref() + .with_context(|| InvalidParquetSnafu { + file: file_path, + reason: format!("No value for key {}", PARQUET_METADATA_KEY), + })?; - Ok(()) + RegionMetadata::from_json(json).context(InvalidMetadataSnafu) } +} - /// Converts our [Batch] from arrow's [RecordBatch]. - fn convert_arrow_record_batch(&self, _record_batch: RecordBatch) -> Result { - unimplemented!() - } +type BoxedRecordBatchStream = BoxStream<'static, std::result::Result>; + +/// Parquet batch reader to read our SST format. +pub struct ParquetReader { + /// Path of the file. + file_path: String, + /// SST file to read. + /// + /// Holds the file handle to avoid the file purge purge it. + file_handle: FileHandle, + object_store: ObjectStore, + /// Predicate to push down. + predicate: Option, + /// Time range to filter. + time_range: Option, + /// Metadata of columns to read. + /// + /// `None` reads all columns. Due to schema change, the projection + /// can contain columns not in the parquet file. + projection: Option>, + + /// Inner parquet record batch stream. + stream: BoxedRecordBatchStream, + /// Helper to read record batches. + /// + /// Not `None` if [ParquetReader::stream] is not `None`. + read_format: ReadFormat, + /// Buffered batches to return. + batches: Vec, } #[async_trait] impl BatchReader for ParquetReader { async fn next_batch(&mut self) -> Result> { - self.maybe_init().await?; + if let Some(batch) = self.batches.pop() { + return Ok(Some(batch)); + } - self.stream - .as_mut() - .unwrap() - .try_next() - .await - .context(ReadParquetSnafu { - path: &self.file_path, - })? - .map(|rb| self.convert_arrow_record_batch(rb)) - .transpose() + // We need to fetch next record batch and convert it to batches. + let Some(record_batch) = self.stream.try_next().await.context(ReadParquetSnafu { + path: &self.file_path, + })? + else { + return Ok(None); + }; + + self.read_format + .convert_record_batch(&record_batch, &mut self.batches)?; + // Reverse batches so we could pop it. + self.batches.reverse(); + + Ok(self.batches.pop()) } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 31c19d942ddd..bd247d3f5c83 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,15 +15,17 @@ //! Parquet writer. use common_telemetry::debug; -use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; +use parquet::schema::types::ColumnPath; use snafu::ResultExt; +use store_api::storage::consts::SEQUENCE_COLUMN_NAME; -use crate::error::{InvalidMetadataSnafu, NewRecordBatchSnafu, Result}; +use crate::error::{InvalidMetadataSnafu, Result}; use crate::read::Source; +use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::stream_writer::BufferedWriter; @@ -54,38 +56,40 @@ impl<'a> ParquetWriter<'a> { let json = metadata.to_json().context(InvalidMetadataSnafu)?; let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); + let ts_column = metadata.time_index_column(); - // FIXME(yingwen): encode metadata into key value. + // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid. let props_builder = WriterProperties::builder() .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(opts.row_group_size); - // TODO(yingwen): Set column encoding for internal columns and timestamp. - // e.g. Use DELTA_BINARY_PACKED and disable dictionary for sequence. - + .set_max_row_group_size(opts.row_group_size) + .set_column_encoding( + ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]), + Encoding::DELTA_BINARY_PACKED, + ) + .set_column_dictionary_enabled( + ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]), + false, + ) + .set_column_encoding( + ColumnPath::new(vec![ts_column.column_schema.name.clone()]), + Encoding::DELTA_BINARY_PACKED, + ); let writer_props = props_builder.build(); - let arrow_schema = metadata.schema.arrow_schema(); + let write_format = WriteFormat::new(metadata); let mut buffered_writer = BufferedWriter::try_new( self.file_path.to_string(), self.object_store.clone(), - arrow_schema.clone(), + write_format.arrow_schema(), Some(writer_props), opts.write_buffer_size.as_bytes() as usize, ) .await?; while let Some(batch) = self.source.next_batch().await? { - let arrow_batch = RecordBatch::try_new( - arrow_schema.clone(), - batch - .fields() - .iter() - .map(|v| v.data.to_arrow_array()) - .collect::>(), - ) - .context(NewRecordBatchSnafu)?; + let arrow_batch = write_format.convert_batch(&batch)?; buffered_writer.write(&arrow_batch).await?; } diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index c09fe9a58719..e123563d80d5 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use api::v1::SemanticType; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use datatypes::arrow::datatypes::FieldRef; use datatypes::prelude::DataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use serde::de::Error; @@ -124,6 +125,11 @@ impl<'de> Deserialize<'de> for RegionMetadata { } impl RegionMetadata { + /// Decode the metadata from a JSON str. + pub fn from_json(s: &str) -> Result { + serde_json::from_str(s).context(SerdeJsonSnafu) + } + /// Encode the metadata to a JSON string. pub fn to_json(&self) -> Result { serde_json::to_string(&self).context(SerdeJsonSnafu) @@ -136,6 +142,11 @@ impl RegionMetadata { .map(|index| &self.column_metadatas[*index]) } + /// Find column index by id. + pub fn column_index_by_id(&self, column_id: ColumnId) -> Option { + self.id_to_index.get(&column_id).copied() + } + /// Returns the time index column /// /// # Panics @@ -145,6 +156,26 @@ impl RegionMetadata { &self.column_metadatas[index] } + /// Returns the arrow field of the time index column. + pub fn time_index_field(&self) -> FieldRef { + let index = self.id_to_index[&self.time_index]; + self.schema.arrow_schema().fields[index].clone() + } + + /// Finds a column by name. + pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> { + self.schema + .column_index_by_name(name) + .map(|index| &self.column_metadatas[index]) + } + + /// Returns all field columns. + pub fn field_columns(&self) -> impl Iterator { + self.column_metadatas + .iter() + .filter(|column| column.semantic_type == SemanticType::Field) + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name. @@ -264,6 +295,7 @@ impl RegionMetadata { /// Checks whether it is a valid column. fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> { + // TODO(yingwen): Ensure column name is not internal columns. if column_metadata.semantic_type == SemanticType::Timestamp { ensure!( column_metadata diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 50786512a940..c092f4bf204f 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -81,6 +81,9 @@ pub const SEQUENCE_COLUMN_NAME: &str = "__sequence"; /// Name for reserved column: op_type pub const OP_TYPE_COLUMN_NAME: &str = "__op_type"; +/// Name for reserved column: primary_key +pub const PRIMARY_KEY_COLUMN_NAME: &str = "__primary_key"; + // ----------------------------------------------------------------------------- // ---------- Default options --------------------------------------------------