Skip to content

Commit

Permalink
Deprecate methods from footer.rs in favor of ParquetMetaDataReader (#…
Browse files Browse the repository at this point in the history
…6451)

* deprecate methods from footer.rs

* remove mention of parse_metadata from docs for ParquetMetaData
  • Loading branch information
etseidl authored Sep 26, 2024
1 parent 50e9e49 commit 2cc0c16
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 225 deletions.
9 changes: 4 additions & 5 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::FOOTER_SIZE;
Expand Down Expand Up @@ -76,7 +75,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
let mut footer = [0; FOOTER_SIZE];
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);

let length = decode_footer(&footer)?;
let length = ParquetMetaDataReader::decode_footer(&footer)?;

if file_size < length + FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand All @@ -90,13 +89,13 @@ impl<F: MetadataFetch> MetadataLoader<F> {
let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
let metadata_start = file_size - length - FOOTER_SIZE;
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
(decode_metadata(&meta)?, None)
(ParquetMetaDataReader::decode_metadata(&meta)?, None)
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;

let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
(
decode_metadata(slice)?,
ParquetMetaDataReader::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
)
};
Expand Down
65 changes: 46 additions & 19 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ use crate::bloom_filter::{
};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
Expand Down Expand Up @@ -186,14 +185,14 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

let metadata_len = decode_footer(&buf)?;
let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;

Ok(Arc::new(decode_metadata(&buf)?))
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
}
.boxed()
}
Expand Down Expand Up @@ -909,7 +908,7 @@ mod tests {
};
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::footer::parse_metadata;
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::page_index::index_reader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
Expand Down Expand Up @@ -952,7 +951,9 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1007,7 +1008,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1073,7 +1076,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1117,7 +1122,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1173,7 +1180,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1238,7 +1247,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1317,7 +1328,9 @@ mod tests {
writer.close().unwrap();

let data: Bytes = buf.into();
let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();

let test = TestReader {
Expand Down Expand Up @@ -1391,7 +1404,9 @@ mod tests {
writer.close().unwrap();

let data: Bytes = buf.into();
let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();

assert_eq!(metadata.num_row_groups(), 2);

Expand Down Expand Up @@ -1479,7 +1494,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let metadata = Arc::new(metadata);

Expand Down Expand Up @@ -1529,7 +1546,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();

let offset_index =
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
Expand Down Expand Up @@ -1619,7 +1638,9 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let file_rows = metadata.file_metadata().num_rows() as usize;
let metadata = Arc::new(metadata);

Expand Down Expand Up @@ -1764,7 +1785,9 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);
let async_reader = TestReader {
data: data.clone(),
Expand Down Expand Up @@ -1793,7 +1816,9 @@ mod tests {
}

async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1933,7 +1958,9 @@ mod tests {
writer.close().unwrap();

let data: Bytes = buf.into();
let metadata = parse_metadata(&data).unwrap();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();

let test = TestReader {
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/bin/parquet-concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
use clap::Parser;
use parquet::column::writer::ColumnCloseResult;
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
use std::fs::File;
Expand Down Expand Up @@ -70,7 +71,7 @@ impl Args {
.iter()
.map(|x| {
let reader = File::open(x)?;
let metadata = parquet::file::footer::parse_metadata(&reader)?;
let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?;
Ok((reader, metadata))
})
.collect::<Result<Vec<_>>>()?;
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::fs::File;
use std::io::Read;

use clap::Parser;
use parquet::file::metadata::ParquetMetaDataReader;
use serde::Serialize;
use thrift::protocol::TCompactInputProtocol;

Expand Down Expand Up @@ -79,7 +80,7 @@ struct Page {
}

fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
let metadata = parquet::file::footer::parse_metadata(reader)?;
let metadata = ParquetMetaDataReader::new().parse_and_finish(reader)?;
let schema = metadata.file_metadata().schema_descr();

let row_groups = (0..metadata.num_row_groups())
Expand Down
Loading

0 comments on commit 2cc0c16

Please sign in to comment.