Skip to content

Commit

Permalink
feat(batch): support spilling for the batch sort executor (#17362)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jun 20, 2024
1 parent 5c0ecf3 commit 7585609
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 29 deletions.
7 changes: 6 additions & 1 deletion src/batch/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

pub mod utils;

use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use risingwave_batch::executor::{BoxedExecutor, SortExecutor};
use risingwave_batch::monitor::BatchSpillMetrics;
use risingwave_common::enable_jemalloc;
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -57,10 +60,12 @@ fn create_order_by_executor(

Box::new(SortExecutor::new(
child,
column_orders,
Arc::new(column_orders),
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
BatchSpillMetrics::for_test(),
))
}

Expand Down
4 changes: 4 additions & 0 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,10 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
// Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original hash table and input data.
// A sub HashAggExecutor would be used to consume each partition one by one.
// If memory is still not enough in the sub HashAggExecutor, it will spill its hash table and input recursively.
info!(
"batch hash agg executor {} starts to spill out",
&self.identity
);
let mut agg_spill_manager = AggSpillManager::new(
&self.identity,
DEFAULT_SPILL_PARTITION_NUM,
Expand Down
4 changes: 4 additions & 0 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ impl<K: HashKey> HashJoinExecutor<K> {
// Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original build side input and probr side input data.
// A sub HashJoinExecutor would be used to consume each partition one by one.
// If memory is still not enough in the sub HashJoinExecutor, it will spill its inputs recursively.
info!(
"batch hash join executor {} starts to spill out",
&self.identity
);
let mut join_spill_manager = JoinSpillManager::new(
&self.identity,
DEFAULT_SPILL_PARTITION_NUM,
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ impl HashKeyDispatcher for LocalLookupJoinExecutorArgs {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::hash::HashKeyDispatcher;
Expand All @@ -502,6 +504,7 @@ mod tests {
diff_executor_output, FakeInnerSideExecutorBuilder, MockExecutor,
};
use crate::executor::{BoxedExecutor, SortExecutor};
use crate::monitor::BatchSpillMetrics;
use crate::task::ShutdownToken;

const CHUNK_SIZE: usize = 1024;
Expand Down Expand Up @@ -594,10 +597,12 @@ mod tests {

Box::new(SortExecutor::new(
child,
column_orders,
Arc::new(column_orders),
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
BatchSpillMetrics::for_test(),
))
}

Expand Down
Loading

0 comments on commit 7585609

Please sign in to comment.