From 6cef43a540461c80245ff8ebd34ea553719a01c8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 16 Dec 2024 15:50:05 +0800 Subject: [PATCH] refactor: Remove spawn and channel inside arrow reader Signed-off-by: Xuanwo --- crates/iceberg/src/arrow/reader.rs | 80 +++++++++++------------------- crates/iceberg/src/scan.rs | 7 ++- 2 files changed, 35 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 16b9468c1..b4e15821f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -31,9 +31,8 @@ use arrow_schema::{ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; -use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; -use futures::{try_join, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -48,7 +47,6 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::runtime::spawn; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, PrimitiveType, Schema}; use crate::utils::available_parallelism; @@ -130,62 +128,41 @@ pub struct ArrowReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, tasks: FileScanTaskStream) -> Result { + pub async fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; - let (tx, rx) = channel(concurrency_limit_data_files); - let mut channel_for_error = tx.clone(); - - spawn(async move { - let result = tasks - .map(|task| Ok((task, file_io.clone(), tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_data_files, - |(file_scan_task, file_io, tx)| async move { - match file_scan_task { - Ok(task) => { - let file_path = task.data_file_path.to_string(); - - spawn(async move { - Self::process_file_scan_task( - task, - batch_size, - file_io, - tx, - row_group_filtering_enabled, - row_selection_enabled, - ) - .await - }) - .await - .map_err(|e| e.with_context("file_path", file_path)) - } - Err(err) => Err(err), - } - }, - ) - .await; + let stream = tasks + .map_ok(move |task| { + let file_io = file_io.clone(); - if let Err(error) = result { - let _ = channel_for_error.send(Err(error)).await; - } - }); + Self::process_file_scan_task( + task, + batch_size, + file_io, + row_group_filtering_enabled, + row_selection_enabled, + ) + }) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) + }) + .try_buffer_unordered(concurrency_limit_data_files) + .try_flatten_unordered(concurrency_limit_data_files); - return Ok(rx.boxed()); + Ok(Box::pin(stream) as ArrowRecordBatchStream) } async fn process_file_scan_task( task: FileScanTask, batch_size: Option, file_io: FileIO, - mut tx: Sender>, row_group_filtering_enabled: bool, row_selection_enabled: bool, - ) -> Result<()> { + ) -> Result { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(&task.data_file_path)?; @@ -269,14 +246,15 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. - let mut record_batch_stream = record_batch_stream_builder.build()?; - - while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(record_batch_transformer.process_record_batch(batch)) - .await? - } - - Ok(()) + let record_batch_stream = + record_batch_stream_builder + .build()? + .map(move |batch| match batch { + Ok(batch) => record_batch_transformer.process_record_batch(batch), + Err(err) => Err(err.into()), + }); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } fn build_field_id_set_and_map( diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89cc21bbf..89df97527 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -422,7 +422,10 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder.build().read(self.plan_files().await?) + arrow_reader_builder + .build() + .read(self.plan_files().await?) + .await } /// Returns a reference to the column names of the table scan. @@ -1404,12 +1407,14 @@ mod tests { let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) + .await .unwrap(); let batche1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) + .await .unwrap(); let batche2: Vec<_> = batch_stream.try_collect().await.unwrap();