diff --git a/src/batch/benches/sort.rs b/src/batch/benches/sort.rs index a2580c2773c9a..08225604b2c64 100644 --- a/src/batch/benches/sort.rs +++ b/src/batch/benches/sort.rs @@ -14,8 +14,11 @@ pub mod utils; +use std::sync::Arc; + use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; use risingwave_batch::executor::{BoxedExecutor, SortExecutor}; +use risingwave_batch::monitor::BatchSpillMetrics; use risingwave_common::enable_jemalloc; use risingwave_common::memory::MemoryContext; use risingwave_common::types::DataType; @@ -57,10 +60,12 @@ fn create_order_by_executor( Box::new(SortExecutor::new( child, - column_orders, + Arc::new(column_orders), "SortExecutor".into(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )) } diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 8b8f86c637199..d17fd07d3bb5a 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -589,6 +589,10 @@ impl HashAggExecutor { // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original hash table and input data. // A sub HashAggExecutor would be used to consume each partition one by one. // If memory is still not enough in the sub HashAggExecutor, it will spill its hash table and input recursively. + info!( + "batch hash agg executor {} starts to spill out", + &self.identity + ); let mut agg_spill_manager = AggSpillManager::new( &self.identity, DEFAULT_SPILL_PARTITION_NUM, diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 24ac63d02ba4e..efa69491811c7 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -548,6 +548,10 @@ impl HashJoinExecutor { // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original build side input and probr side input data. // A sub HashJoinExecutor would be used to consume each partition one by one. // If memory is still not enough in the sub HashJoinExecutor, it will spill its inputs recursively. + info!( + "batch hash join executor {} starts to spill out", + &self.identity + ); let mut join_spill_manager = JoinSpillManager::new( &self.identity, DEFAULT_SPILL_PARTITION_NUM, diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 54d3185de3dca..e5ee7b82a2a18 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -487,6 +487,8 @@ impl HashKeyDispatcher for LocalLookupJoinExecutorArgs { #[cfg(test)] mod tests { + use std::sync::Arc; + use risingwave_common::array::{DataChunk, DataChunkTestExt}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::hash::HashKeyDispatcher; @@ -502,6 +504,7 @@ mod tests { diff_executor_output, FakeInnerSideExecutorBuilder, MockExecutor, }; use crate::executor::{BoxedExecutor, SortExecutor}; + use crate::monitor::BatchSpillMetrics; use crate::task::ShutdownToken; const CHUNK_SIZE: usize = 1024; @@ -594,10 +597,12 @@ mod tests { Box::new(SortExecutor::new( child, - column_orders, + Arc::new(column_orders), "SortExecutor".into(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )) } diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs index 05cd0f8c94fa0..67af7b5569842 100644 --- a/src/batch/src/executor/order_by.rs +++ b/src/batch/src/executor/order_by.rs @@ -12,18 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use bytes::Bytes; use futures_async_stream::try_stream; +use itertools::Itertools; +use prost::Message; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; use risingwave_common::memory::MemoryContext; +use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::memcmp_encoding::encode_chunk; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::data::DataChunk as PbDataChunk; -use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; +use super::{ + BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, + WrapStreamExecutor, +}; use crate::error::{BatchError, Result}; +use crate::executor::merge_sort::MergeSortExecutor; +use crate::monitor::BatchSpillMetrics; +use crate::spill::spill_op::{SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY}; use crate::task::BatchTaskContext; /// Sort Executor @@ -35,11 +48,15 @@ use crate::task::BatchTaskContext; /// 4. Build and yield data chunks according to the row order pub struct SortExecutor { child: BoxedExecutor, - column_orders: Vec, + column_orders: Arc>, identity: String, schema: Schema, chunk_size: usize, mem_context: MemoryContext, + enable_spill: bool, + spill_metrics: Arc, + /// The upper bound of memory usage for this executor. + memory_upper_bound: Option, } impl Executor for SortExecutor { @@ -71,15 +88,17 @@ impl BoxedExecutorBuilder for SortExecutor { .column_orders .iter() .map(ColumnOrder::from_protobuf) - .collect(); + .collect_vec(); let identity = source.plan_node().get_identity(); Ok(Box::new(SortExecutor::new( child, - column_orders, + Arc::new(column_orders), identity.clone(), source.context.get_config().developer.chunk_size, source.context.create_executor_mem_context(identity), + source.context.get_config().enable_spill, + source.context.spill_metrics(), ))) } } @@ -87,16 +106,30 @@ impl BoxedExecutorBuilder for SortExecutor { impl SortExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { + let child_schema = self.child.schema().clone(); + let mut need_to_spill = false; + // If the memory upper bound is less than 1MB, we don't need to check memory usage. + let check_memory = match self.memory_upper_bound { + Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY, + None => true, + }; + let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size); let mut chunks = Vec::new_in(self.mem_context.global_allocator()); + let mut input_stream = self.child.execute(); #[for_await] - for chunk in self.child.execute() { + for chunk in &mut input_stream { let chunk = chunk?.compact(); let chunk_estimated_heap_size = chunk.estimated_heap_size(); chunks.push(chunk); - if !self.mem_context.add(chunk_estimated_heap_size as i64) { - Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory { + if self.enable_spill { + need_to_spill = true; + break; + } else { + Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + } } } @@ -115,21 +148,102 @@ impl SortExecutor { .enumerate() .map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)), ); - if !self.mem_context.add(chunk_estimated_heap_size as i64) { - Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory { + if self.enable_spill { + need_to_spill = true; + break; + } else { + Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + } } } - encoded_rows.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); + if need_to_spill { + // A spilling version of sort, a.k.a. external sort. + // When SortExecutor told memory is insufficient, SortSpillManager will start to partition the sort buffer and spill to disk. + // After spilling the sort buffer, SortSpillManager will consume all chunks from its input executor. + // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original input data. + // A sub SortExecutor would be used to sort each partition respectively and then a MergeSortExecutor would be used to merge all sorted partitions. + // If memory is still not enough in the sub SortExecutor, it will spill its inputs recursively. + info!("batch sort executor {} starts to spill out", &self.identity); + let mut sort_spill_manager = SortSpillManager::new( + &self.identity, + DEFAULT_SPILL_PARTITION_NUM, + child_schema.data_types(), + self.chunk_size, + self.spill_metrics.clone(), + )?; + sort_spill_manager.init_writers().await?; + + // Release memory + drop(encoded_rows); + + // Spill buffer + for chunk in chunks { + sort_spill_manager.write_input_chunk(chunk).await?; + } - for (row, _) in encoded_rows { - if let Some(spilled) = chunk_builder.append_one_row(row) { - yield spilled + // Spill input chunks. + #[for_await] + for chunk in input_stream { + let chunk: DataChunk = chunk?; + sort_spill_manager.write_input_chunk(chunk).await?; + } + + sort_spill_manager.close_writers().await?; + + let partition_num = sort_spill_manager.partition_num; + // Merge sorted-partitions + let mut sorted_inputs: Vec = Vec::with_capacity(partition_num); + for i in 0..partition_num { + let partition_size = sort_spill_manager.estimate_partition_size(i).await?; + + let input_stream = sort_spill_manager.read_input_partition(i).await?; + + let sub_sort_executor: SortExecutor = SortExecutor::new_inner( + Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)), + self.column_orders.clone(), + format!("{}-sub{}", self.identity.clone(), i), + self.chunk_size, + self.mem_context.clone(), + self.enable_spill, + self.spill_metrics.clone(), + Some(partition_size), + ); + + debug!( + "create sub_sort {} for sort {} to spill", + sub_sort_executor.identity, self.identity + ); + + sorted_inputs.push(Box::new(sub_sort_executor)); + } + + let merge_sort = MergeSortExecutor::new( + sorted_inputs, + self.column_orders.clone(), + self.schema.clone(), + format!("{}-merge-sort", self.identity.clone()), + self.chunk_size, + self.mem_context.clone(), + ); + + #[for_await] + for chunk in Box::new(merge_sort).execute() { + yield chunk?; + } + } else { + encoded_rows.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); + + for (row, _) in encoded_rows { + if let Some(spilled) = chunk_builder.append_one_row(row) { + yield spilled + } } - } - if let Some(spilled) = chunk_builder.consume_all() { - yield spilled + if let Some(spilled) = chunk_builder.consume_all() { + yield spilled + } } } } @@ -137,10 +251,34 @@ impl SortExecutor { impl SortExecutor { pub fn new( child: BoxedExecutor, - column_orders: Vec, + column_orders: Arc>, identity: String, chunk_size: usize, mem_context: MemoryContext, + enable_spill: bool, + spill_metrics: Arc, + ) -> Self { + Self::new_inner( + child, + column_orders, + identity, + chunk_size, + mem_context, + enable_spill, + spill_metrics, + None, + ) + } + + fn new_inner( + child: BoxedExecutor, + column_orders: Arc>, + identity: String, + chunk_size: usize, + mem_context: MemoryContext, + enable_spill: bool, + spill_metrics: Arc, + memory_upper_bound: Option, ) -> Self { let schema = child.schema().clone(); Self { @@ -150,18 +288,137 @@ impl SortExecutor { schema, chunk_size, mem_context, + enable_spill, + spill_metrics, + memory_upper_bound, } } } +/// `SortSpillManager` is used to manage how to write spill data file and read them back. +/// The spill data first need to be partitioned in a round-robin way. Each partition contains 1 file: `input_chunks_file` +/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes. +/// Finally, spill file content will look like the below. +/// The file write pattern is append-only and the read pattern is sequential scan. +/// This can maximize the disk IO performance. +/// +/// ```text +/// [proto_len] +/// [proto_bytes] +/// ... +/// [proto_len] +/// [proto_bytes] +/// ``` +struct SortSpillManager { + op: SpillOp, + partition_num: usize, + round_robin_idx: usize, + input_writers: Vec, + input_chunk_builders: Vec, + child_data_types: Vec, + spill_chunk_size: usize, + spill_metrics: Arc, +} + +impl SortSpillManager { + fn new( + agg_identity: &String, + partition_num: usize, + child_data_types: Vec, + spill_chunk_size: usize, + spill_metrics: Arc, + ) -> Result { + let suffix_uuid = uuid::Uuid::new_v4(); + let dir = format!("/{}-{}/", agg_identity, suffix_uuid); + let op = SpillOp::create(dir)?; + let input_writers = Vec::with_capacity(partition_num); + let input_chunk_builders = Vec::with_capacity(partition_num); + Ok(Self { + op, + partition_num, + input_writers, + input_chunk_builders, + round_robin_idx: 0, + child_data_types, + spill_chunk_size, + spill_metrics, + }) + } + + async fn init_writers(&mut self) -> Result<()> { + for i in 0..self.partition_num { + let partition_file_name = format!("input-chunks-p{}", i); + let w = self.op.writer_with(&partition_file_name).await?; + self.input_writers.push(w); + self.input_chunk_builders.push(DataChunkBuilder::new( + self.child_data_types.clone(), + self.spill_chunk_size, + )); + } + Ok(()) + } + + async fn write_input_chunk(&mut self, chunk: DataChunk) -> Result<()> { + for row in chunk.rows() { + let partition = self.round_robin_idx; + if let Some(chunk) = self.input_chunk_builders[partition].append_one_row(row) { + let chunk_pb: PbDataChunk = chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + self.spill_metrics + .batch_spill_write_bytes + .inc_by((buf.len() + len_bytes.len()) as u64); + self.input_writers[partition].write(len_bytes).await?; + self.input_writers[partition].write(buf).await?; + } + self.round_robin_idx = (self.round_robin_idx + 1) % self.partition_num; + } + Ok(()) + } + + async fn close_writers(&mut self) -> Result<()> { + for partition in 0..self.partition_num { + if let Some(output_chunk) = self.input_chunk_builders[partition].consume_all() { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + self.spill_metrics + .batch_spill_write_bytes + .inc_by((buf.len() + len_bytes.len()) as u64); + self.input_writers[partition].write(len_bytes).await?; + self.input_writers[partition].write(buf).await?; + } + } + + for mut w in self.input_writers.drain(..) { + w.close().await?; + } + Ok(()) + } + + async fn read_input_partition(&mut self, partition: usize) -> Result { + let input_partition_file_name = format!("input-chunks-p{}", partition); + let r = self.op.reader_with(&input_partition_file_name).await?; + Ok(SpillOp::read_stream(r, self.spill_metrics.clone())) + } + + async fn estimate_partition_size(&self, partition: usize) -> Result { + let input_partition_file_name = format!("input-chunks-p{}", partition); + let input_size = self + .op + .stat(&input_partition_file_name) + .await? + .content_length(); + Ok(input_size) + } +} + #[cfg(test)] mod tests { use futures::StreamExt; use risingwave_common::array::*; use risingwave_common::catalog::Field; - use risingwave_common::types::{ - DataType, Date, Interval, Scalar, StructType, Time, Timestamp, F32, - }; + use risingwave_common::types::{Date, Interval, Scalar, StructType, Time, Timestamp, F32}; use risingwave_common::util::sort_util::OrderType; use super::*; @@ -197,10 +454,12 @@ mod tests { let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + Arc::new(column_orders), "SortExecutor2".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let fields = &order_by_executor.schema().fields; assert_eq!(fields[0].data_type, DataType::Int32); @@ -247,10 +506,12 @@ mod tests { ]; let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + Arc::new(column_orders), "SortExecutor2".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let fields = &order_by_executor.schema().fields; assert_eq!(fields[0].data_type, DataType::Float32); @@ -297,10 +558,12 @@ mod tests { ]; let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + Arc::new(column_orders), "SortExecutor2".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let fields = &order_by_executor.schema().fields; assert_eq!(fields[0].data_type, DataType::Varchar); @@ -372,10 +635,12 @@ mod tests { ]; let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + Arc::new(column_orders), "SortExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let mut stream = order_by_executor.execute(); @@ -452,10 +717,12 @@ mod tests { ]; let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + Arc::new(column_orders), "SortExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let mut stream = order_by_executor.execute(); @@ -558,10 +825,12 @@ mod tests { ]; let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + column_orders.into(), "SortExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let mut stream = order_by_executor.execute(); @@ -706,10 +975,12 @@ mod tests { ]; let order_by_executor = Box::new(SortExecutor::new( Box::new(mock_executor), - column_orders, + Arc::new(column_orders), "SortExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, + BatchSpillMetrics::for_test(), )); let mut stream = order_by_executor.execute();