Skip to content

Commit

Permalink
feat: can convert Format FileMetaData to ParquetMetaData
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Dec 21, 2023
1 parent 6c1c7d8 commit 3d92c25
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod stats;
pub mod writer;

use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use crate::sst::file::FileTimeRange;

Expand Down Expand Up @@ -59,6 +60,8 @@ pub struct SstInfo {
pub file_size: u64,
/// Number of rows.
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option<ParquetMetaData>,
}

#[cfg(test)]
Expand Down
78 changes: 74 additions & 4 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

//! Parquet writer.
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_telemetry::debug;
use common_time::Timestamp;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::basic::{ColumnOrder, Compression, Encoding, ZstdLevel};
use parquet::file::metadata::{FileMetaData, KeyValue, ParquetMetaData, RowGroupMetaData};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use parquet::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use parquet::schema::types;
use parquet::schema::types::{from_thrift, ColumnPath, SchemaDescPtr, SchemaDescriptor};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
Expand Down Expand Up @@ -107,15 +111,20 @@ impl ParquetWriter {
return Ok(None);
}

let (_file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?;
let (file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?;

// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();

// convert file_meta to ParquetMetaData
let parquet_metadata = convert_t_file_meta_to_parquet_meta_data(file_meta);

// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
time_range,
file_size,
num_rows: stats.num_rows,
file_metadata: parquet_metadata.ok(),
}))
}

Expand Down Expand Up @@ -167,3 +176,64 @@ impl SourceStats {
}
}
}

// refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L72-L90
/// Convert [TFileMetaData] to [ParquetMetaData]
fn convert_t_file_meta_to_parquet_meta_data(
t_file_metadata: TFileMetaData,
) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
let row_groups: Vec<_> = t_file_metadata
.row_groups
.into_iter()
.map(|rg| RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)?)
.collect();

let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);

Ok(ParquetMetaData::new(file_metadata, row_groups))
}

// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
TColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}

0 comments on commit 3d92c25

Please sign in to comment.