Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove spawn and channel inside arrow reader #806

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 29 additions & 51 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ use arrow_schema::{
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::{try_join, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
Expand All @@ -48,7 +47,6 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -130,62 +128,41 @@ pub struct ArrowReader {
impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
pub async fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();

spawn(async move {
let result = tasks
.map(|task| Ok((task, file_io.clone(), tx.clone())))
.try_for_each_concurrent(
concurrency_limit_data_files,
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
let file_path = task.data_file_path.to_string();

spawn(async move {
Self::process_file_scan_task(
task,
batch_size,
file_io,
tx,
row_group_filtering_enabled,
row_selection_enabled,
)
.await
})
.await
.map_err(|e| e.with_context("file_path", file_path))
}
Err(err) => Err(err),
}
},
)
.await;
let stream = tasks
.map_ok(move |task| {
let file_io = file_io.clone();

if let Err(error) = result {
let _ = channel_for_error.send(Err(error)).await;
}
});
Self::process_file_scan_task(
task,
batch_size,
file_io,
row_group_filtering_enabled,
row_selection_enabled,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_flatten_unordered(concurrency_limit_data_files);

return Ok(rx.boxed());
Ok(Box::pin(stream) as ArrowRecordBatchStream)
}

async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<()> {
) -> Result<ArrowRecordBatchStream> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(&task.data_file_path)?;
Expand Down Expand Up @@ -269,14 +246,15 @@ impl ArrowReader {

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}

Ok(())
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

fn build_field_id_set_and_map(
Expand Down
7 changes: 6 additions & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ impl TableScan {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}

arrow_reader_builder.build().read(self.plan_files().await?)
arrow_reader_builder
.build()
.read(self.plan_files().await?)
.await
}

/// Returns a reference to the column names of the table scan.
Expand Down Expand Up @@ -1404,12 +1407,14 @@ mod tests {
let batch_stream = reader
.clone()
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batche1: Vec<_> = batch_stream.try_collect().await.unwrap();

let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
let batch_stream = reader
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batche2: Vec<_> = batch_stream.try_collect().await.unwrap();

Expand Down
Loading