Skip to content

Commit

Permalink
test: parquet metadata equal
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Dec 22, 2023
1 parent 3d92c25 commit 4875351
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 71 deletions.
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ pub enum Error {
error: ArrowError,
location: Location,
},

#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -459,6 +466,7 @@ impl ErrorExt for Error {
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
ConvertMetaData { .. } => StatusCode::Internal,
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
Expand Down
64 changes: 64 additions & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! SST in parquet format.
mod format;
mod helper;
mod page_reader;
pub mod reader;
pub mod row_group;
Expand Down Expand Up @@ -198,4 +199,67 @@ mod tests {
};
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
}

#[tokio::test]
async fn test_parquet_metadata_eq() {
// create test env
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer =
ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone());
let sst_info = writer
.write_all(&write_opts)
.await
.unwrap()
.expect("write_all should return sst info");
let writer_metadata = sst_info.file_metadata.unwrap();

// read the sst file metadata
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();

// Due to ParquetMetaData not implementing PartialEq
// we check the fields manually
assert_eq!(
writer_metadata.file_metadata().version(),
reader_metadata.file_metadata().version()
);
assert_eq!(
writer_metadata.file_metadata().schema_descr(),
reader_metadata.file_metadata().schema_descr()
);
assert_eq!(
writer_metadata.file_metadata().num_rows(),
reader_metadata.file_metadata().num_rows()
);
assert_eq!(
writer_metadata.file_metadata().created_by(),
reader_metadata.file_metadata().created_by()
);
assert_eq!(
writer_metadata.file_metadata().key_value_metadata(),
reader_metadata.file_metadata().key_value_metadata()
);
assert_eq!(
writer_metadata.file_metadata().column_orders(),
reader_metadata.file_metadata().column_orders()
);
assert_eq!(writer_metadata.row_groups(), reader_metadata.row_groups());
}
}
86 changes: 86 additions & 0 deletions src/mito2/src/sst/parquet/helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use parquet::schema::types::{from_thrift, SchemaDescriptor};
use snafu::ResultExt;

use crate::error;
use crate::error::Result;

// refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [TFileMetaData] to [ParquetMetaData]
pub fn to_parquet_metadata(t_file_metadata: TFileMetaData) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));

let mut row_groups = Vec::new();
for rg in t_file_metadata.row_groups {
row_groups.push(
RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
.context(error::ConvertMetaDataSnafu)?,
);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);
// There may be a problem owing to lacking of column_index and offset_index,
// if we open page index in the future.
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
TColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
5 changes: 5 additions & 0 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,4 +450,9 @@ impl ParquetReader {

Ok(None)
}

#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.reader_builder.parquet_meta.clone()
}
}
78 changes: 7 additions & 71 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@

//! Parquet writer.
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_telemetry::debug;
use common_time::Timestamp;
use object_store::ObjectStore;
use parquet::basic::{ColumnOrder, Compression, Encoding, ZstdLevel};
use parquet::file::metadata::{FileMetaData, KeyValue, ParquetMetaData, RowGroupMetaData};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use parquet::schema::types;
use parquet::schema::types::{from_thrift, ColumnPath, SchemaDescPtr, SchemaDescriptor};
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;

use super::helper::to_parquet_metadata;
use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu};
use crate::read::{Batch, Source};
use crate::sst::parquet::format::WriteFormat;
Expand Down Expand Up @@ -116,15 +113,15 @@ impl ParquetWriter {
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();

// convert file_meta to ParquetMetaData
let parquet_metadata = convert_t_file_meta_to_parquet_meta_data(file_meta);
// convert FileMetaData to ParquetMetaData
let parquet_metadata = to_parquet_metadata(file_meta).ok();

// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
time_range,
file_size,
num_rows: stats.num_rows,
file_metadata: parquet_metadata.ok(),
file_metadata: parquet_metadata,
}))
}

Expand Down Expand Up @@ -176,64 +173,3 @@ impl SourceStats {
}
}
}

// refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L72-L90
/// Convert [TFileMetaData] to [ParquetMetaData]
fn convert_t_file_meta_to_parquet_meta_data(
t_file_metadata: TFileMetaData,
) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
let row_groups: Vec<_> = t_file_metadata
.row_groups
.into_iter()
.map(|rg| RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)?)
.collect();

let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);

Ok(ParquetMetaData::new(file_metadata, row_groups))
}

// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
TColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}

0 comments on commit 4875351

Please sign in to comment.