diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d29450e50a22..46bf52a5ed72 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -399,6 +399,13 @@ pub enum Error { error: ArrowError, location: Location, }, + + #[snafu(display("Invalid file metadata"))] + ConvertMetaData { + location: Location, + #[snafu(source)] + error: parquet::errors::ParquetError, + }, } pub type Result = std::result::Result; @@ -459,6 +466,7 @@ impl ErrorExt for Error { InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), + ConvertMetaData { .. } => StatusCode::Internal, ComputeArrow { .. } => StatusCode::Internal, ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index fe6adf2c83b7..487872dd9c96 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,6 +15,7 @@ //! SST in parquet format. mod format; +mod helper; mod page_reader; pub mod reader; pub mod row_group; @@ -198,4 +199,67 @@ mod tests { }; assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); } + + #[tokio::test] + async fn test_parquet_metadata_eq() { + // create test env + let mut env = crate::test_util::TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + // write the sst file and get sst info + // sst info contains the parquet metadata, which is converted from FileMetaData + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let sst_info = writer + .write_all(&write_opts) + .await + .unwrap() + .expect("write_all should return sst info"); + let writer_metadata = sst_info.file_metadata.unwrap(); + + // read the sst file metadata + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let reader = builder.build().await.unwrap(); + let reader_metadata = reader.parquet_metadata(); + + // Due to ParquetMetaData not implementing PartialEq + // we check the fields manually + assert_eq!( + writer_metadata.file_metadata().version(), + reader_metadata.file_metadata().version() + ); + assert_eq!( + writer_metadata.file_metadata().schema_descr(), + reader_metadata.file_metadata().schema_descr() + ); + assert_eq!( + writer_metadata.file_metadata().num_rows(), + reader_metadata.file_metadata().num_rows() + ); + assert_eq!( + writer_metadata.file_metadata().created_by(), + reader_metadata.file_metadata().created_by() + ); + assert_eq!( + writer_metadata.file_metadata().key_value_metadata(), + reader_metadata.file_metadata().key_value_metadata() + ); + assert_eq!( + writer_metadata.file_metadata().column_orders(), + reader_metadata.file_metadata().column_orders() + ); + assert_eq!(writer_metadata.row_groups(), reader_metadata.row_groups()); + } } diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs new file mode 100644 index 000000000000..16a805b2b9ca --- /dev/null +++ b/src/mito2/src/sst/parquet/helper.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use parquet::basic::ColumnOrder; +use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use parquet::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use parquet::schema::types::{from_thrift, SchemaDescriptor}; +use snafu::ResultExt; + +use crate::error; +use crate::error::Result; + +// refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90 +/// Convert [TFileMetaData] to [ParquetMetaData] +pub fn to_parquet_metadata(t_file_metadata: TFileMetaData) -> Result { + let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?; + let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema)); + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push( + RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg) + .context(error::ConvertMetaDataSnafu)?, + ); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr); + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_desc_ptr, + column_orders, + ); + // There may be a problem owing to lacking of column_index and offset_index, + // if we open page index in the future. + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137 +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + TColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 45e36786d41d..a55afc5893cd 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -450,4 +450,9 @@ impl ParquetReader { Ok(None) } + + #[cfg(test)] + pub fn parquet_metadata(&self) -> Arc { + self.reader_builder.parquet_meta.clone() + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 7f4acf7f50ae..770fe161eafc 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,22 +14,19 @@ //! Parquet writer. -use std::sync::Arc; - use common_datasource::file_format::parquet::BufferedWriter; use common_telemetry::debug; use common_time::Timestamp; use object_store::ObjectStore; -use parquet::basic::{ColumnOrder, Compression, Encoding, ZstdLevel}; -use parquet::file::metadata::{FileMetaData, KeyValue, ParquetMetaData, RowGroupMetaData}; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; -use parquet::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; -use parquet::schema::types; -use parquet::schema::types::{from_thrift, ColumnPath, SchemaDescPtr, SchemaDescriptor}; +use parquet::schema::types::ColumnPath; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; +use super::helper::to_parquet_metadata; use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu}; use crate::read::{Batch, Source}; use crate::sst::parquet::format::WriteFormat; @@ -116,15 +113,15 @@ impl ParquetWriter { // Safety: num rows > 0 so we must have min/max. let time_range = stats.time_range.unwrap(); - // convert file_meta to ParquetMetaData - let parquet_metadata = convert_t_file_meta_to_parquet_meta_data(file_meta); + // convert FileMetaData to ParquetMetaData + let parquet_metadata = to_parquet_metadata(file_meta).ok(); // object_store.write will make sure all bytes are written or an error is raised. Ok(Some(SstInfo { time_range, file_size, num_rows: stats.num_rows, - file_metadata: parquet_metadata.ok(), + file_metadata: parquet_metadata, })) } @@ -176,64 +173,3 @@ impl SourceStats { } } } - -// refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L72-L90 -/// Convert [TFileMetaData] to [ParquetMetaData] -fn convert_t_file_meta_to_parquet_meta_data( - t_file_metadata: TFileMetaData, -) -> Result { - let schema = from_thrift(&t_file_metadata.schema)?; - let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema)); - let row_groups: Vec<_> = t_file_metadata - .row_groups - .into_iter() - .map(|rg| RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)?) - .collect(); - - let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr); - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_desc_ptr, - column_orders, - ); - - Ok(ParquetMetaData::new(file_metadata, row_groups)) -} - -// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137 -/// Parses column orders from Thrift definition. -/// If no column orders are defined, returns `None`. -fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, -) -> Option> { - match t_column_orders { - Some(orders) => { - // Should always be the case - assert_eq!( - orders.len(), - schema_descr.num_columns(), - "Column order length mismatch" - ); - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - TColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.converted_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Some(res) - } - None => None, - } -}