Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(batch): add unit test for spill sort #17394

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/batch/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn create_order_by_executor(
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
))
}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
};
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

Expand Down Expand Up @@ -325,7 +325,7 @@ impl AggSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let agg_state_writers = Vec::with_capacity(partition_num);
let agg_state_chunk_builder = Vec::with_capacity(partition_num);
let input_writers = Vec::with_capacity(partition_num);
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
use crate::monitor::BatchSpillMetrics;
use crate::risingwave_common::hash::NullBitmap;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken};

Expand Down Expand Up @@ -273,7 +273,7 @@ impl JoinSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", join_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let probe_side_writers = Vec::with_capacity(partition_num);
let build_side_writers = Vec::with_capacity(partition_num);
let probe_side_chunk_builders = Vec::with_capacity(partition_num);
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
))
}
Expand Down
99 changes: 81 additions & 18 deletions src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use super::{
use crate::error::{BatchError, Result};
use crate::executor::merge_sort::MergeSortExecutor;
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY};
use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillBackend, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::BatchTaskContext;

/// Sort Executor
Expand All @@ -53,7 +56,7 @@ pub struct SortExecutor {
schema: Schema,
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
/// The upper bound of memory usage for this executor.
memory_upper_bound: Option<u64>,
Expand Down Expand Up @@ -97,7 +100,11 @@ impl BoxedExecutorBuilder for SortExecutor {
identity.clone(),
source.context.get_config().developer.chunk_size,
source.context.create_executor_mem_context(identity),
source.context.get_config().enable_spill,
if source.context.get_config().enable_spill {
Some(Disk)
} else {
None
},
source.context.spill_metrics(),
)))
}
Expand All @@ -124,7 +131,7 @@ impl SortExecutor {
let chunk_estimated_heap_size = chunk.estimated_heap_size();
chunks.push(chunk);
if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
if self.enable_spill {
if self.spill_backend.is_some() {
need_to_spill = true;
break;
} else {
Expand All @@ -149,7 +156,7 @@ impl SortExecutor {
.map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)),
);
if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
if self.enable_spill {
if self.spill_backend.is_some() {
need_to_spill = true;
break;
} else {
Expand All @@ -167,6 +174,7 @@ impl SortExecutor {
// If memory is still not enough in the sub SortExecutor, it will spill its inputs recursively.
info!("batch sort executor {} starts to spill out", &self.identity);
let mut sort_spill_manager = SortSpillManager::new(
self.spill_backend.clone().unwrap(),
&self.identity,
DEFAULT_SPILL_PARTITION_NUM,
child_schema.data_types(),
Expand Down Expand Up @@ -206,7 +214,7 @@ impl SortExecutor {
format!("{}-sub{}", self.identity.clone(), i),
self.chunk_size,
self.mem_context.clone(),
self.enable_spill,
self.spill_backend.clone(),
self.spill_metrics.clone(),
Some(partition_size),
);
Expand Down Expand Up @@ -255,7 +263,7 @@ impl SortExecutor {
identity: String,
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
) -> Self {
Self::new_inner(
Expand All @@ -264,7 +272,7 @@ impl SortExecutor {
identity,
chunk_size,
mem_context,
enable_spill,
spill_backend,
spill_metrics,
None,
)
Expand All @@ -276,7 +284,7 @@ impl SortExecutor {
identity: String,
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
memory_upper_bound: Option<u64>,
) -> Self {
Expand All @@ -288,7 +296,7 @@ impl SortExecutor {
schema,
chunk_size,
mem_context,
enable_spill,
spill_backend,
spill_metrics,
memory_upper_bound,
}
Expand Down Expand Up @@ -322,6 +330,7 @@ struct SortSpillManager {

impl SortSpillManager {
fn new(
spill_backend: SpillBackend,
agg_identity: &String,
partition_num: usize,
child_data_types: Vec<DataType>,
Expand All @@ -330,7 +339,7 @@ impl SortSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, spill_backend)?;
let input_writers = Vec::with_capacity(partition_num);
let input_chunk_builders = Vec::with_capacity(partition_num);
Ok(Self {
Expand Down Expand Up @@ -458,7 +467,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down Expand Up @@ -510,7 +519,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down Expand Up @@ -562,7 +571,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down Expand Up @@ -639,7 +648,7 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

Expand Down Expand Up @@ -721,7 +730,7 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

Expand Down Expand Up @@ -829,7 +838,7 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

Expand Down Expand Up @@ -979,12 +988,66 @@ mod tests {
"SortExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
));

let mut stream = order_by_executor.execute();
let res = stream.next().await;
assert_eq!(res.unwrap().unwrap(), output_chunk)
}

#[tokio::test]
async fn test_spill_out() {
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Float32),
Field::unnamed(DataType::Float64),
],
};
let mut mock_executor = MockExecutor::new(schema);
mock_executor.add(DataChunk::from_pretty(
" f F
-2.2 3.3
-1.1 2.2
1.1 1.1
2.2 -1.1
3.3 -2.2",
));
let column_orders = vec![
ColumnOrder {
column_index: 1,
order_type: OrderType::ascending(),
},
ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
},
];
let order_by_executor = Box::new(SortExecutor::new(
Box::new(mock_executor),
Arc::new(column_orders),
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::for_spill_test(),
Some(SpillBackend::Memory),
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
assert_eq!(fields[0].data_type, DataType::Float32);
assert_eq!(fields[1].data_type, DataType::Float64);

let mut stream = order_by_executor.execute();
let res = stream.next().await;
assert!(res.is_some());
if let Some(res) = res {
let res = res.unwrap();
let col0 = res.column_at(0);
assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
}
}
}
33 changes: 25 additions & 8 deletions src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::anyhow;
use futures_async_stream::try_stream;
use futures_util::AsyncReadExt;
use opendal::layers::RetryLayer;
use opendal::services::Fs;
use opendal::services::{Fs, Memory};
use opendal::Operator;
use prost::Message;
use risingwave_common::array::DataChunk;
Expand All @@ -39,25 +39,42 @@ const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/";
const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
const DEFAULT_IO_CONCURRENT_TASK: usize = 8;

#[derive(Clone)]
pub enum SpillBackend {
Disk,
/// Only for testing purpose
Memory,
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
}

/// `SpillOp` is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style.
pub struct SpillOp {
pub op: Operator,
}

impl SpillOp {
pub fn create(path: String) -> Result<SpillOp> {
pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp> {
assert!(path.ends_with('/'));

let spill_dir =
std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string());
let root = format!("/{}/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR, path);

let mut builder = Fs::default();
builder.root(&root);

let op: Operator = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();
let op = match spill_backend {
SpillBackend::Disk => {
let mut builder = Fs::default();
builder.root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
}
SpillBackend::Memory => {
let mut builder = Memory::default();
builder.root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
}
};
Ok(SpillOp { op })
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/memory/mem_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl MemoryContext {
Self::new_with_mem_limit(None, counter, mem_limit)
}

pub fn for_spill_test() -> Self {
Self::new_with_mem_limit(None, TrAdderAtomic::new(0), 0)
}

/// Add `bytes` memory usage. Pass negative value to decrease memory usage.
/// Returns `false` if the memory usage exceeds the limit.
pub fn add(&self, bytes: i64) -> bool {
Expand Down
Loading