Skip to content

Commit

Permalink
make batch range scan sequentially
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 committed Mar 12, 2024
1 parent d527ba8 commit b3a11a3
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};
use rw_futures_util::select_all;

use crate::error::{BatchError, Result};
use crate::executor::{
Expand Down Expand Up @@ -319,28 +318,28 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
}

// Range Scan
let range_scans = select_all(range_scans.into_iter().map(|range_scan| {
let table = table.clone();
let histogram = histogram.clone();
Box::pin(Self::execute_range(
table,
range_scan,
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
for range in range_scans {
let stream = Self::execute_range(
table.clone(),
range,
ordered,
epoch.clone(),
chunk_size,
limit,
histogram,
))
}));
#[for_await]
for chunk in range_scans {
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit
&& returned >= *limit
{
return Ok(());
histogram.clone(),
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit
&& returned >= *limit
{
return Ok(());
}
}
}
}
Expand Down

0 comments on commit b3a11a3

Please sign in to comment.