Skip to content

Commit

Permalink
refactor: Remove spawn and channel inside arrow reader (#806)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 24, 2024
1 parent 0e5a3c3 commit f33628e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 52 deletions.
80 changes: 29 additions & 51 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ use arrow_schema::{
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::{try_join, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
Expand All @@ -48,7 +47,6 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -130,62 +128,41 @@ pub struct ArrowReader {
impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
pub async fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();

spawn(async move {
let result = tasks
.map(|task| Ok((task, file_io.clone(), tx.clone())))
.try_for_each_concurrent(
concurrency_limit_data_files,
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
let file_path = task.data_file_path.to_string();

spawn(async move {
Self::process_file_scan_task(
task,
batch_size,
file_io,
tx,
row_group_filtering_enabled,
row_selection_enabled,
)
.await
})
.await
.map_err(|e| e.with_context("file_path", file_path))
}
Err(err) => Err(err),
}
},
)
.await;
let stream = tasks
.map_ok(move |task| {
let file_io = file_io.clone();

if let Err(error) = result {
let _ = channel_for_error.send(Err(error)).await;
}
});
Self::process_file_scan_task(
task,
batch_size,
file_io,
row_group_filtering_enabled,
row_selection_enabled,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_flatten_unordered(concurrency_limit_data_files);

return Ok(rx.boxed());
Ok(Box::pin(stream) as ArrowRecordBatchStream)
}

async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<()> {
) -> Result<ArrowRecordBatchStream> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(&task.data_file_path)?;
Expand Down Expand Up @@ -269,14 +246,15 @@ impl ArrowReader {

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}

Ok(())
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

fn build_field_id_set_and_map(
Expand Down
7 changes: 6 additions & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,10 @@ impl TableScan {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}

arrow_reader_builder.build().read(self.plan_files().await?)
arrow_reader_builder
.build()
.read(self.plan_files().await?)
.await
}

/// Returns a reference to the column names of the table scan.
Expand Down Expand Up @@ -1410,12 +1413,14 @@ mod tests {
let batch_stream = reader
.clone()
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batche1: Vec<_> = batch_stream.try_collect().await.unwrap();

let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
let batch_stream = reader
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batche2: Vec<_> = batch_stream.try_collect().await.unwrap();

Expand Down

0 comments on commit f33628e

Please sign in to comment.