diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index bf2fb9613b7eb..4c1261363a72b 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -33,7 +33,6 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; -use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ @@ -319,28 +318,28 @@ impl RowSeqScanExecutor { } // Range Scan - let range_scans = select_all(range_scans.into_iter().map(|range_scan| { - let table = table.clone(); - let histogram = histogram.clone(); - Box::pin(Self::execute_range( - table, - range_scan, + // WARN: DO NOT use `select` to execute range scans concurrently + // it can consume too much memory if there're too many ranges. + for range in range_scans { + let stream = Self::execute_range( + table.clone(), + range, ordered, epoch.clone(), chunk_size, limit, - histogram, - )) - })); - #[for_await] - for chunk in range_scans { - let chunk = chunk?; - returned += chunk.cardinality() as u64; - yield chunk; - if let Some(limit) = &limit - && returned >= *limit - { - return Ok(()); + histogram.clone(), + ); + #[for_await] + for chunk in stream { + let chunk = chunk?; + returned += chunk.cardinality() as u64; + yield chunk; + if let Some(limit) = &limit + && returned >= *limit + { + return Ok(()); + } } } }