From 3d92c25672dc0948fba410068fe7a29833f82624 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 20 Dec 2023 15:24:07 +0800 Subject: [PATCH] feat: can convert Format FileMetaData to ParquetMetaData --- src/mito2/src/sst/parquet.rs | 3 ++ src/mito2/src/sst/parquet/writer.rs | 78 +++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index af3f8479f39c..fe6adf2c83b7 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -22,6 +22,7 @@ mod stats; pub mod writer; use common_base::readable_size::ReadableSize; +use parquet::file::metadata::ParquetMetaData; use crate::sst::file::FileTimeRange; @@ -59,6 +60,8 @@ pub struct SstInfo { pub file_size: u64, /// Number of rows. pub num_rows: usize, + /// File Meta Data + pub file_metadata: Option, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index d776b3ac627d..7f4acf7f50ae 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,14 +14,18 @@ //! Parquet writer. +use std::sync::Arc; + use common_datasource::file_format::parquet::BufferedWriter; use common_telemetry::debug; use common_time::Timestamp; use object_store::ObjectStore; -use parquet::basic::{Compression, Encoding, ZstdLevel}; -use parquet::file::metadata::KeyValue; +use parquet::basic::{ColumnOrder, Compression, Encoding, ZstdLevel}; +use parquet::file::metadata::{FileMetaData, KeyValue, ParquetMetaData, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; -use parquet::schema::types::ColumnPath; +use parquet::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use parquet::schema::types; +use parquet::schema::types::{from_thrift, ColumnPath, SchemaDescPtr, SchemaDescriptor}; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; @@ -107,15 +111,20 @@ impl ParquetWriter { return Ok(None); } - let (_file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?; + let (file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?; + // Safety: num rows > 0 so we must have min/max. let time_range = stats.time_range.unwrap(); + // convert file_meta to ParquetMetaData + let parquet_metadata = convert_t_file_meta_to_parquet_meta_data(file_meta); + // object_store.write will make sure all bytes are written or an error is raised. Ok(Some(SstInfo { time_range, file_size, num_rows: stats.num_rows, + file_metadata: parquet_metadata.ok(), })) } @@ -167,3 +176,64 @@ impl SourceStats { } } } + +// refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L72-L90 +/// Convert [TFileMetaData] to [ParquetMetaData] +fn convert_t_file_meta_to_parquet_meta_data( + t_file_metadata: TFileMetaData, +) -> Result { + let schema = from_thrift(&t_file_metadata.schema)?; + let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema)); + let row_groups: Vec<_> = t_file_metadata + .row_groups + .into_iter() + .map(|rg| RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)?) + .collect(); + + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr); + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_desc_ptr, + column_orders, + ); + + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137 +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + TColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } +}