Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ParquetRecordBatchStreamBuilder docs / examples #6948

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 133 additions & 82 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,65 +15,13 @@
// specific language governing permissions and limitations
// under the License.

//! Provides `async` API for reading parquet files as
//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet files as
//! [`RecordBatch`]es
//!
//! ```
//! # #[tokio::main(flavor="current_thread")]
//! # async fn main() {
//! #
//! # use arrow_array::RecordBatch;
//! # use arrow::util::pretty::pretty_format_batches;
//! # use futures::TryStreamExt;
//! # use tokio::fs::File;
//! #
//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
//! #
//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
//! # let formatted = pretty_format_batches(batches).unwrap().to_string();
//! # let actual_lines: Vec<_> = formatted.trim().lines().collect();
//! # assert_eq!(
//! # &actual_lines, expected_lines,
//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
//! # expected_lines, actual_lines
//! # );
//! # }
//! #
//! let testdata = arrow::util::test_util::parquet_test_data();
//! let path = format!("{}/alltypes_plain.parquet", testdata);
//! let file = File::open(path).await.unwrap();
//! This can be used to decode a Parquet file in streaming fashion (without
//! downloading the whole file at once) from a remote source, such as an object store.
//!
//! let builder = ParquetRecordBatchStreamBuilder::new(file)
//! .await
//! .unwrap()
//! .with_batch_size(3);
//!
//! let file_metadata = builder.metadata().file_metadata();
//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
//!
//! let stream = builder.with_projection(mask).build().unwrap();
//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
//! assert_eq!(results.len(), 3);
//!
//! assert_batches_eq(
//! &results,
//! &[
//! "+----------+-------------+-----------+",
//! "| bool_col | tinyint_col | float_col |",
//! "+----------+-------------+-----------+",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "+----------+-------------+-----------+",
//! ],
//! );
//! # }
//! ```
//! See example on [`ParquetRecordBatchStreamBuilder`]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved examples to ParquetRecordBatchStreamBuilder

alamb marked this conversation as resolved.
Show resolved Hide resolved

use std::collections::VecDeque;
use std::fmt::Formatter;
Expand Down Expand Up @@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);

/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file
/// A builder for reading parquet files from an `async` source as [`ParquetRecordBatchStream`]
///
/// In particular, this handles reading the parquet file metadata, allowing consumers
/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
/// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
/// specified source.
///
/// # Example
/// ```
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// # use arrow_array::RecordBatch;
/// # use arrow::util::pretty::pretty_format_batches;
/// # use futures::TryStreamExt;
/// #
/// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
/// #
/// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
/// # let formatted = pretty_format_batches(batches).unwrap().to_string();
/// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
/// # assert_eq!(
/// # &actual_lines, expected_lines,
/// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
/// # expected_lines, actual_lines
/// # );
/// # }
/// #
/// # let testdata = arrow::util::test_util::parquet_test_data();
/// # let path = format!("{}/alltypes_plain.parquet", testdata);
/// // use tokio::fs::File to read data using an async I/O. This can be replaced with
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// // other async I/O reader such as a reader from an object store.
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// let file = tokio::fs::File::open(path).await.unwrap();
///
/// // Configure options for reading from the async souce
/// let builder = ParquetRecordBatchStreamBuilder::new(file)
/// .await
/// .unwrap();
/// // Building the stream opens the parquet file (reads metadata, etc) and returns
/// // a stream that can be used to incrementally read the data in batches
/// let stream = builder.build().unwrap();
/// // in this example, we collect the stream into a Vec<RecordBatch>
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// // but real applications would likely process the batches as they are read
/// let results = stream.try_collect::<Vec<_>>().await.unwrap();
/// // demonstrate the results are as expected
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// assert_batches_eq(
/// &results,
/// &[
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the first example select all columns to make it simpler to understand

/// "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
/// "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |",
/// "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |",
/// "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |",
/// "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |",
/// "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |",
/// "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |",
/// "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |",
/// "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |",
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
/// ],
/// );
/// # }
/// ```
///
/// # Example configuring options and reading metadata
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a more advanced example with projection pushdown

///
/// There are many options that control the behavior of the reader, such as
/// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
/// # use std::fs::metadata;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing example is less compelling in my mind than what was on the module level docs so I remove the current one)

/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// # use arrow_array::{Int32Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
/// # use tempfile::tempfile;
/// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// # let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// // Open async file containing parquet data
/// let mut file = tokio::fs::File::from_std(file);
/// // construct the reader
/// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
/// .await.unwrap().build().unwrap();
/// // Read batche
/// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
/// # use arrow_array::RecordBatch;
/// # use arrow::util::pretty::pretty_format_batches;
/// # use futures::TryStreamExt;
/// #
/// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
/// #
/// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
/// # let formatted = pretty_format_batches(batches).unwrap().to_string();
/// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
/// # assert_eq!(
/// # &actual_lines, expected_lines,
/// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
/// # expected_lines, actual_lines
/// # );
/// # }
/// #
/// # let testdata = arrow::util::test_util::parquet_test_data();
/// # let path = format!("{}/alltypes_plain.parquet", testdata);
/// // as before, use tokio::fs::File to read data using an async I/O.
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// let file = tokio::fs::File::open(path).await.unwrap();
///
/// // Configure options for reading from the async source, in this case we set the batch size
/// // to 3 that produces 3 rows at a time.
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// let builder = ParquetRecordBatchStreamBuilder::new(file)
/// .await
/// .unwrap()
/// .with_batch_size(3);
///
/// // We can also read the metadata to inspect the schema and other metadata
/// // before actually reading the data
/// let file_metadata = builder.metadata().file_metadata();
/// // Specify that we only want to read the 1st, 2nd, and 6th columns
/// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
///
/// let stream = builder.with_projection(mask).build().unwrap();
/// let results = stream.try_collect::<Vec<_>>().await.unwrap();
/// // print out the results
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// assert_batches_eq(
/// &results,
/// &[
/// "+----------+-------------+-----------+",
/// "| bool_col | tinyint_col | float_col |",
/// "+----------+-------------+-----------+",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "+----------+-------------+-----------+",
/// ],
/// );
///
/// // The results has 8 rows, so since we set the batch size to 3, we expect
/// // 3 batches with 3 rows each and the last batch with 2 rows.
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// assert_eq!(results.len(), 3);
/// # }
/// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}

/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
/// and [`ArrowReaderOptions`]
pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
Expand Down Expand Up @@ -352,6 +400,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
}

/// Read bloom filter for a column in a row group
///
/// Returns `None` if the column does not have a bloom filter
///
/// We should call this function after other forms pruning, such as projection and predicate pushdown.
Expand Down Expand Up @@ -415,6 +464,8 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
}

/// Build a new [`ParquetRecordBatchStream`]
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();

Expand Down
Loading