-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve ParquetRecordBatchStreamBuilder
docs / examples
#6948
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,65 +15,13 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Provides `async` API for reading parquet files as | ||
//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet files as | ||
//! [`RecordBatch`]es | ||
//! | ||
//! ``` | ||
//! # #[tokio::main(flavor="current_thread")] | ||
//! # async fn main() { | ||
//! # | ||
//! # use arrow_array::RecordBatch; | ||
//! # use arrow::util::pretty::pretty_format_batches; | ||
//! # use futures::TryStreamExt; | ||
//! # use tokio::fs::File; | ||
//! # | ||
//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; | ||
//! # | ||
//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { | ||
//! # let formatted = pretty_format_batches(batches).unwrap().to_string(); | ||
//! # let actual_lines: Vec<_> = formatted.trim().lines().collect(); | ||
//! # assert_eq!( | ||
//! # &actual_lines, expected_lines, | ||
//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
//! # expected_lines, actual_lines | ||
//! # ); | ||
//! # } | ||
//! # | ||
//! let testdata = arrow::util::test_util::parquet_test_data(); | ||
//! let path = format!("{}/alltypes_plain.parquet", testdata); | ||
//! let file = File::open(path).await.unwrap(); | ||
//! This can be used to decode a Parquet file in streaming fashion (without | ||
//! downloading the whole file at once) from a remote source, such as an object store. | ||
//! | ||
//! let builder = ParquetRecordBatchStreamBuilder::new(file) | ||
//! .await | ||
//! .unwrap() | ||
//! .with_batch_size(3); | ||
//! | ||
//! let file_metadata = builder.metadata().file_metadata(); | ||
//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]); | ||
//! | ||
//! let stream = builder.with_projection(mask).build().unwrap(); | ||
//! let results = stream.try_collect::<Vec<_>>().await.unwrap(); | ||
//! assert_eq!(results.len(), 3); | ||
//! | ||
//! assert_batches_eq( | ||
//! &results, | ||
//! &[ | ||
//! "+----------+-------------+-----------+", | ||
//! "| bool_col | tinyint_col | float_col |", | ||
//! "+----------+-------------+-----------+", | ||
//! "| true | 0 | 0.0 |", | ||
//! "| false | 1 | 1.1 |", | ||
//! "| true | 0 | 0.0 |", | ||
//! "| false | 1 | 1.1 |", | ||
//! "| true | 0 | 0.0 |", | ||
//! "| false | 1 | 1.1 |", | ||
//! "| true | 0 | 0.0 |", | ||
//! "| false | 1 | 1.1 |", | ||
//! "+----------+-------------+-----------+", | ||
//! ], | ||
//! ); | ||
//! # } | ||
//! ``` | ||
//! See example on [`ParquetRecordBatchStreamBuilder`] | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
use std::collections::VecDeque; | ||
use std::fmt::Formatter; | ||
|
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata { | |
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API | ||
pub struct AsyncReader<T>(T); | ||
|
||
/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file | ||
/// A builder for reading parquet files from an `async` source as [`ParquetRecordBatchStream`] | ||
/// | ||
/// In particular, this handles reading the parquet file metadata, allowing consumers | ||
/// This builder handles reading the parquet file metadata, allowing consumers | ||
/// to use this information to select what specific columns, row groups, etc... | ||
/// they wish to be read by the resulting stream | ||
/// | ||
/// See examples on [`ParquetRecordBatchStreamBuilder::new`] | ||
/// | ||
/// See [`ArrowReaderBuilder`] for additional member functions | ||
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>; | ||
|
||
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> { | ||
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file | ||
/// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the | ||
/// specified source. | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// # #[tokio::main(flavor="current_thread")] | ||
/// # async fn main() { | ||
/// # | ||
/// # use arrow_array::RecordBatch; | ||
/// # use arrow::util::pretty::pretty_format_batches; | ||
/// # use futures::TryStreamExt; | ||
/// # | ||
/// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; | ||
/// # | ||
/// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { | ||
/// # let formatted = pretty_format_batches(batches).unwrap().to_string(); | ||
/// # let actual_lines: Vec<_> = formatted.trim().lines().collect(); | ||
/// # assert_eq!( | ||
/// # &actual_lines, expected_lines, | ||
/// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
/// # expected_lines, actual_lines | ||
/// # ); | ||
/// # } | ||
/// # | ||
/// # let testdata = arrow::util::test_util::parquet_test_data(); | ||
/// # let path = format!("{}/alltypes_plain.parquet", testdata); | ||
/// // use tokio::fs::File to read data using an async I/O. This can be replaced with | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// // other async I/O reader such as a reader from an object store. | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// let file = tokio::fs::File::open(path).await.unwrap(); | ||
/// | ||
/// // Configure options for reading from the async souce | ||
/// let builder = ParquetRecordBatchStreamBuilder::new(file) | ||
/// .await | ||
/// .unwrap(); | ||
/// // Building the stream opens the parquet file (reads metadata, etc) and returns | ||
/// // a stream that can be used to incrementally read the data in batches | ||
/// let stream = builder.build().unwrap(); | ||
/// // in this example, we collect the stream into a Vec<RecordBatch> | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// // but real applications would likely process the batches as they are read | ||
/// let results = stream.try_collect::<Vec<_>>().await.unwrap(); | ||
/// // demonstrate the results are as expected | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// assert_batches_eq( | ||
/// &results, | ||
/// &[ | ||
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made the first example select all columns to make it simpler to understand |
||
/// "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", | ||
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", | ||
/// "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", | ||
/// "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |", | ||
/// "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |", | ||
/// "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |", | ||
/// "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |", | ||
/// "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |", | ||
/// "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |", | ||
/// "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |", | ||
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", | ||
/// ], | ||
/// ); | ||
/// # } | ||
/// ``` | ||
/// | ||
/// # Example configuring options and reading metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made a more advanced example with projection pushdown |
||
/// | ||
/// There are many options that control the behavior of the reader, such as | ||
/// `with_batch_size`, `with_projection`, `with_filter`, etc... | ||
/// | ||
/// ``` | ||
/// # use std::fs::metadata; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the existing example is less compelling in my mind than what was on the module level docs so I remove the current one) |
||
/// # use std::sync::Arc; | ||
/// # use bytes::Bytes; | ||
/// # use arrow_array::{Int32Array, RecordBatch}; | ||
/// # use arrow_schema::{DataType, Field, Schema}; | ||
/// # use parquet::arrow::arrow_reader::ArrowReaderMetadata; | ||
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; | ||
/// # use tempfile::tempfile; | ||
/// # use futures::StreamExt; | ||
/// # #[tokio::main(flavor="current_thread")] | ||
/// # async fn main() { | ||
/// # | ||
/// # let mut file = tempfile().unwrap(); | ||
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); | ||
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); | ||
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); | ||
/// # writer.write(&batch).unwrap(); | ||
/// # writer.close().unwrap(); | ||
/// // Open async file containing parquet data | ||
/// let mut file = tokio::fs::File::from_std(file); | ||
/// // construct the reader | ||
/// let mut reader = ParquetRecordBatchStreamBuilder::new(file) | ||
/// .await.unwrap().build().unwrap(); | ||
/// // Read batche | ||
/// let batch: RecordBatch = reader.next().await.unwrap().unwrap(); | ||
/// # use arrow_array::RecordBatch; | ||
/// # use arrow::util::pretty::pretty_format_batches; | ||
/// # use futures::TryStreamExt; | ||
/// # | ||
/// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; | ||
/// # | ||
/// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { | ||
/// # let formatted = pretty_format_batches(batches).unwrap().to_string(); | ||
/// # let actual_lines: Vec<_> = formatted.trim().lines().collect(); | ||
/// # assert_eq!( | ||
/// # &actual_lines, expected_lines, | ||
/// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
/// # expected_lines, actual_lines | ||
/// # ); | ||
/// # } | ||
/// # | ||
/// # let testdata = arrow::util::test_util::parquet_test_data(); | ||
/// # let path = format!("{}/alltypes_plain.parquet", testdata); | ||
/// // as before, use tokio::fs::File to read data using an async I/O. | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// let file = tokio::fs::File::open(path).await.unwrap(); | ||
/// | ||
/// // Configure options for reading from the async source, in this case we set the batch size | ||
/// // to 3 that produces 3 rows at a time. | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// let builder = ParquetRecordBatchStreamBuilder::new(file) | ||
/// .await | ||
/// .unwrap() | ||
/// .with_batch_size(3); | ||
/// | ||
/// // We can also read the metadata to inspect the schema and other metadata | ||
/// // before actually reading the data | ||
/// let file_metadata = builder.metadata().file_metadata(); | ||
/// // Specify that we only want to read the 1st, 2nd, and 6th columns | ||
/// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]); | ||
/// | ||
/// let stream = builder.with_projection(mask).build().unwrap(); | ||
/// let results = stream.try_collect::<Vec<_>>().await.unwrap(); | ||
/// // print out the results | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// assert_batches_eq( | ||
/// &results, | ||
/// &[ | ||
/// "+----------+-------------+-----------+", | ||
/// "| bool_col | tinyint_col | float_col |", | ||
/// "+----------+-------------+-----------+", | ||
/// "| true | 0 | 0.0 |", | ||
/// "| false | 1 | 1.1 |", | ||
/// "| true | 0 | 0.0 |", | ||
/// "| false | 1 | 1.1 |", | ||
/// "| true | 0 | 0.0 |", | ||
/// "| false | 1 | 1.1 |", | ||
/// "| true | 0 | 0.0 |", | ||
/// "| false | 1 | 1.1 |", | ||
/// "+----------+-------------+-----------+", | ||
/// ], | ||
/// ); | ||
/// | ||
/// // The results has 8 rows, so since we set the batch size to 3, we expect | ||
/// // 3 batches with 3 rows each and the last batch with 2 rows. | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// assert_eq!(results.len(), 3); | ||
/// # } | ||
/// ``` | ||
pub async fn new(input: T) -> Result<Self> { | ||
Self::new_with_options(input, Default::default()).await | ||
} | ||
|
||
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file | ||
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source | ||
/// and [`ArrowReaderOptions`] | ||
pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> { | ||
let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?; | ||
|
@@ -352,6 +400,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> { | |
} | ||
|
||
/// Read bloom filter for a column in a row group | ||
/// | ||
/// Returns `None` if the column does not have a bloom filter | ||
/// | ||
/// We should call this function after other forms pruning, such as projection and predicate pushdown. | ||
|
@@ -415,6 +464,8 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> { | |
} | ||
|
||
/// Build a new [`ParquetRecordBatchStream`] | ||
/// | ||
/// See examples on [`ParquetRecordBatchStreamBuilder::new`] | ||
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> { | ||
let num_row_groups = self.metadata.row_groups().len(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved examples to
ParquetRecordBatchStreamBuilder