Skip to content

Commit

Permalink
test(batch): add unit test for spill sort (#17394)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jun 24, 2024
1 parent 57798fa commit 4d03c48
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/batch/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn create_order_by_executor(
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
))
}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
};
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

Expand Down Expand Up @@ -325,7 +325,7 @@ impl AggSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let agg_state_writers = Vec::with_capacity(partition_num);
let agg_state_chunk_builder = Vec::with_capacity(partition_num);
let input_writers = Vec::with_capacity(partition_num);
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
use crate::monitor::BatchSpillMetrics;
use crate::risingwave_common::hash::NullBitmap;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken};

Expand Down Expand Up @@ -273,7 +273,7 @@ impl JoinSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", join_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let probe_side_writers = Vec::with_capacity(partition_num);
let build_side_writers = Vec::with_capacity(partition_num);
let probe_side_chunk_builders = Vec::with_capacity(partition_num);
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
))
}
Expand Down
99 changes: 81 additions & 18 deletions src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use super::{
use crate::error::{BatchError, Result};
use crate::executor::merge_sort::MergeSortExecutor;
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY};
use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillBackend, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::BatchTaskContext;

/// Sort Executor
Expand All @@ -53,7 +56,7 @@ pub struct SortExecutor {
schema: Schema,
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
/// The upper bound of memory usage for this executor.
memory_upper_bound: Option<u64>,
Expand Down Expand Up @@ -97,7 +100,11 @@ impl BoxedExecutorBuilder for SortExecutor {
identity.clone(),
source.context.get_config().developer.chunk_size,
source.context.create_executor_mem_context(identity),
source.context.get_config().enable_spill,
if source.context.get_config().enable_spill {
Some(Disk)
} else {
None
},
source.context.spill_metrics(),
)))
}
Expand All @@ -124,7 +131,7 @@ impl SortExecutor {
let chunk_estimated_heap_size = chunk.estimated_heap_size();
chunks.push(chunk);
if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
if self.enable_spill {
if self.spill_backend.is_some() {
need_to_spill = true;
break;
} else {
Expand All @@ -149,7 +156,7 @@ impl SortExecutor {
.map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)),
);
if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
if self.enable_spill {
if self.spill_backend.is_some() {
need_to_spill = true;
break;
} else {
Expand All @@ -167,6 +174,7 @@ impl SortExecutor {
// If memory is still not enough in the sub SortExecutor, it will spill its inputs recursively.
info!("batch sort executor {} starts to spill out", &self.identity);
let mut sort_spill_manager = SortSpillManager::new(
self.spill_backend.clone().unwrap(),
&self.identity,
DEFAULT_SPILL_PARTITION_NUM,
child_schema.data_types(),
Expand Down Expand Up @@ -206,7 +214,7 @@ impl SortExecutor {
format!("{}-sub{}", self.identity.clone(), i),
self.chunk_size,
self.mem_context.clone(),
self.enable_spill,
self.spill_backend.clone(),
self.spill_metrics.clone(),
Some(partition_size),
);
Expand Down Expand Up @@ -255,7 +263,7 @@ impl SortExecutor {
identity: String,
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
) -> Self {
Self::new_inner(
Expand All @@ -264,7 +272,7 @@ impl SortExecutor {
identity,
chunk_size,
mem_context,
enable_spill,
spill_backend,
spill_metrics,
None,
)
Expand All @@ -276,7 +284,7 @@ impl SortExecutor {
identity: String,
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
memory_upper_bound: Option<u64>,
) -> Self {
Expand All @@ -288,7 +296,7 @@ impl SortExecutor {
schema,
chunk_size,
mem_context,
enable_spill,
spill_backend,
spill_metrics,
memory_upper_bound,
}
Expand Down Expand Up @@ -322,6 +330,7 @@ struct SortSpillManager {

impl SortSpillManager {
fn new(
spill_backend: SpillBackend,
agg_identity: &String,
partition_num: usize,
child_data_types: Vec<DataType>,
Expand All @@ -330,7 +339,7 @@ impl SortSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, spill_backend)?;
let input_writers = Vec::with_capacity(partition_num);
let input_chunk_builders = Vec::with_capacity(partition_num);
Ok(Self {
Expand Down Expand Up @@ -458,7 +467,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down Expand Up @@ -510,7 +519,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down Expand Up @@ -562,7 +571,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down Expand Up @@ -639,7 +648,7 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

Expand Down Expand Up @@ -721,7 +730,7 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

Expand Down Expand Up @@ -829,7 +838,7 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

Expand Down Expand Up @@ -979,12 +988,66 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

let mut stream = order_by_executor.execute();
let res = stream.next().await;
assert_eq!(res.unwrap().unwrap(), output_chunk)
}

#[tokio::test]
async fn test_spill_out() {
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Float32),
Field::unnamed(DataType::Float64),
],
};
let mut mock_executor = MockExecutor::new(schema);
mock_executor.add(DataChunk::from_pretty(
" f F
-2.2 3.3
-1.1 2.2
1.1 1.1
2.2 -1.1
3.3 -2.2",
));
let column_orders = vec![
ColumnOrder {
column_index: 1,
order_type: OrderType::ascending(),
},
ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
},
];
let order_by_executor = Box::new(SortExecutor::new(
Box::new(mock_executor),
Arc::new(column_orders),
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::for_spill_test(),
Some(SpillBackend::Memory),
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
assert_eq!(fields[0].data_type, DataType::Float32);
assert_eq!(fields[1].data_type, DataType::Float64);

let mut stream = order_by_executor.execute();
let res = stream.next().await;
assert!(res.is_some());
if let Some(res) = res {
let res = res.unwrap();
let col0 = res.column_at(0);
assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
}
}
}
33 changes: 25 additions & 8 deletions src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::anyhow;
use futures_async_stream::try_stream;
use futures_util::AsyncReadExt;
use opendal::layers::RetryLayer;
use opendal::services::Fs;
use opendal::services::{Fs, Memory};
use opendal::Operator;
use prost::Message;
use risingwave_common::array::DataChunk;
Expand All @@ -39,25 +39,42 @@ const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/";
const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
const DEFAULT_IO_CONCURRENT_TASK: usize = 8;

#[derive(Clone)]
pub enum SpillBackend {
Disk,
/// Only for testing purpose
Memory,
}

/// `SpillOp` is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style.
pub struct SpillOp {
pub op: Operator,
}

impl SpillOp {
pub fn create(path: String) -> Result<SpillOp> {
pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp> {
assert!(path.ends_with('/'));

let spill_dir =
std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string());
let root = format!("/{}/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR, path);

let mut builder = Fs::default();
builder.root(&root);

let op: Operator = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();
let op = match spill_backend {
SpillBackend::Disk => {
let mut builder = Fs::default();
builder.root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
}
SpillBackend::Memory => {
let mut builder = Memory::default();
builder.root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
}
};
Ok(SpillOp { op })
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/memory/mem_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl MemoryContext {
Self::new_with_mem_limit(None, counter, mem_limit)
}

pub fn for_spill_test() -> Self {
Self::new_with_mem_limit(None, TrAdderAtomic::new(0), 0)
}

/// Add `bytes` memory usage. Pass negative value to decrease memory usage.
/// Returns `false` if the memory usage exceeds the limit.
pub fn add(&self, bytes: i64) -> bool {
Expand Down

0 comments on commit 4d03c48

Please sign in to comment.