From 511033db27996acf3d6361fbf8fe727b62be931c Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 10 May 2024 11:42:14 +0800 Subject: [PATCH 01/18] init version --- Cargo.lock | 5 + src/batch/Cargo.toml | 5 + src/batch/src/error.rs | 7 + src/batch/src/executor/hash_agg.rs | 307 ++++++++++++++++++++++++++++- src/common/src/hash/key.rs | 2 +- src/expr/core/src/aggregate/mod.rs | 3 +- 6 files changed, 324 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28aa7b737e349..736b59b3dadbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10008,6 +10008,7 @@ dependencies = [ "assert_matches", "async-recursion", "async-trait", + "bytes", "criterion", "either", "foyer", @@ -10021,9 +10022,11 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", + "opendal", "parking_lot 0.12.1", "paste", "prometheus", + "prost 0.12.1", "rand", "risingwave_common", "risingwave_common_estimate_size", @@ -10046,6 +10049,8 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "twox-hash", + "uuid", "workspace-hack", ] diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 019c33253466b..e61cb348a132f 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -48,6 +48,11 @@ scopeguard = "1" serde_json = "1" thiserror = "1" thiserror-ext = { workspace = true } +opendal = "0.45.1" +uuid = { version = "1", features = ["v4"] } +twox-hash = "1" +prost = "0.12" +bytes = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 8033ddfb3479b..dbf981ccc32ff 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -139,6 +139,13 @@ pub enum BatchError { #[error("Not enough memory to run this query, batch memory limit is {0} bytes")] OutOfMemory(u64), + + #[error(transparent)] + Spill( + #[from] + #[backtrace] + opendal::Error, + ), } // Serialize/deserialize error. diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index bc487553c9b5f..870b3e65d0c59 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -12,20 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::{BuildHasher, Hasher}; use std::marker::PhantomData; +use std::ops::{Deref, DerefMut}; +use anyhow::anyhow; +use bytes::Bytes; use futures_async_stream::try_stream; +use futures_util::AsyncReadExt; use itertools::Itertools; +use opendal::layers::RetryLayer; +use opendal::Operator; +use opendal::services::Fs; +use prost::Message; +use twox_hash::XxHash64; use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_pb::data::DataChunk as PbDataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; -use risingwave_common::types::DataType; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::{DataType, ToOwnedDatum}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashAggNode; +use risingwave_common::row::RowExt; use crate::error::{BatchError, Result}; use crate::executor::aggregation::build as build_agg; @@ -211,6 +226,70 @@ impl Executor for HashAggExecutor { } } +pub fn new_spill_op(root: String) -> Result { + assert!(root.ends_with('/')); + let mut builder = Fs::default(); + builder.root(&root); + + let op: Operator = Operator::new(builder)? + .layer(RetryLayer::default()) + .finish(); + Ok(SpillOp { + op, + }) +} + +pub struct SpillOp { + op: Operator, +} + +impl SpillOp { + pub async fn writer_with(&self, name: &str) -> Result { + Ok(self.op.writer_with(name).append(true).buffer(64 * 1024).await?) + } + + pub async fn reader_with(&self, name: &str) -> Result { + Ok(self.op.reader_with(name).buffer(64 * 1024).await?) + } +} + +impl Drop for SpillOp { + fn drop(&mut self) { + let op = self.op.clone(); + tokio::task::spawn(async move { + let result = op.remove_all("/").await; + if let Err(e) = result { + error!("Failed to remove spill directory: {}", e); + } + }); + } +} + +impl DerefMut for SpillOp { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.op + } +} + +impl Deref for SpillOp { + type Target = Operator; + + fn deref(&self) -> &Self::Target { + &self.op + } +} + +#[derive(Default, Clone, Copy)] +pub struct SpillBuildHasher(u64); + +impl BuildHasher for SpillBuildHasher { + type Hasher = XxHash64; + + fn build_hasher(&self) -> Self::Hasher { + XxHash64::with_seed(self.0) + } +} + impl HashAggExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { @@ -220,9 +299,15 @@ impl HashAggExecutor { self.mem_context.global_allocator(), ); + let child_schema = self.child.schema().clone(); + + let enable_spill = true; + let mut need_to_spill = false; + + let mut input_stream = self.child.execute(); // consume all chunks to compute the agg result #[for_await] - for chunk in self.child.execute() { + for chunk in &mut input_stream { let chunk = StreamChunk::from(chunk?); let keys = K::build_many(self.group_key_columns.as_slice(), &chunk); let mut memory_usage_diff = 0; @@ -251,7 +336,223 @@ impl HashAggExecutor { } // update memory usage if !self.mem_context.add(memory_usage_diff) { - Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + if enable_spill { + need_to_spill = true; + break; + } else { + Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + } + } + } + + if need_to_spill { + let suffix_uuid = uuid::Uuid::new_v4(); + let dir = format!("/tmp/rw_batch_spill-{}-{}/", &self.identity, suffix_uuid.to_string()); + let op = new_spill_op(dir.into())?; + let partition_num = 4; + let mut agg_state_writers = Vec::with_capacity(partition_num); + let mut agg_state_chunk_builder = Vec::with_capacity(partition_num); + let mut writers = Vec::with_capacity(partition_num); + let mut chunk_builders = Vec::with_capacity(partition_num); + let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1); + + for i in 0..partition_num { + let agg_state_partition_file_name = format!("agg-state-p{}", i); + let w = op.writer_with(&agg_state_partition_file_name).await?; + agg_state_writers.push(w); + + let partition_file_name = format!("input-chunks-p{}", i); + let w = op.writer_with(&partition_file_name).await?; + writers.push(w); + chunk_builders.push(DataChunkBuilder::new(child_schema.data_types(), 1024)); + agg_state_chunk_builder.push(DataChunkBuilder::new(self + .group_key_types + .iter().cloned().chain(self + .aggs + .iter().map(|agg| agg.return_type())).collect(), 1024)); + } + + // Spill the agg state + let mut memory_usage_diff = 0; + for (key, states) in groups { + let key_row = key.deserialize(&self.group_key_types)?; + let mut agg_datums = vec![]; + for (agg, state) in self.aggs.iter().zip_eq_fast(states) { + let encode_state = agg.encode_state(&state)?; + memory_usage_diff -= state.estimated_size() as i64; + agg_datums.push(encode_state); + } + let agg_state_row = OwnedRow::from_iter(agg_datums.into_iter()); + let mut hasher = spill_build_hasher.build_hasher(); + key.hash(&mut hasher); + let hash_code = hasher.finish(); + let partition = hash_code as usize % partition_num; + if let Some(output_chunk) = agg_state_chunk_builder[partition].append_one_row(key_row.chain(agg_state_row)) { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + agg_state_writers[partition].write(len_bytes).await?; + agg_state_writers[partition].write(buf).await?; + } + } + + for i in 0..partition_num { + if let Some(output_chunk) = agg_state_chunk_builder[i].consume_all() { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + agg_state_writers[i].write(len_bytes).await?; + agg_state_writers[i].write(buf).await?; + } + } + + for mut w in agg_state_writers { + w.close().await?; + } + + self.mem_context.add(memory_usage_diff); + + groups = AggHashMap::::with_hasher_in( + PrecomputedBuildHasher, + self.mem_context.global_allocator(), + ); + + + #[for_await] + for chunk in input_stream { + let chunk: DataChunk = chunk?; + let hash_codes = chunk.get_hash_values(self.group_key_columns.as_slice(), spill_build_hasher); + let (columns, vis) = chunk.into_parts_v2(); + for i in 0..partition_num { + let new_vis = vis.clone() & Bitmap::from_iter(hash_codes.iter().map(|hash_code| (hash_code.value() as usize % partition_num) == i)); + let new_chunk = DataChunk::from_parts(columns.clone(), new_vis); + for output_chunk in chunk_builders[i].append_chunk(new_chunk) { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + writers[i].write(len_bytes).await?; + writers[i].write(buf).await?; + } + } + } + + for i in 0..partition_num { + if let Some(output_chunk) = chunk_builders[i].consume_all() { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + writers[i].write(len_bytes).await?; + writers[i].write(buf).await?; + } + } + + for mut w in writers { + w.close().await?; + } + + let mut agg_state_readers = Vec::with_capacity(partition_num); + for i in 0..partition_num { + let agg_state_partition_file_name = format!("agg-state-p{}", i); + let r = op.reader_with(&agg_state_partition_file_name).await?; + agg_state_readers.push(r); + } + + let mut readers = Vec::with_capacity(partition_num); + for i in 0..partition_num { + let partition_file_name = format!("input-chunks-p{}", i); + let r = op.reader_with(&partition_file_name).await?; + readers.push(r); + } + + for mut agg_state_reader in agg_state_readers { + let mut buf = [0u8; 4]; + loop { + if let Err(err) = agg_state_reader.read_exact(&mut buf).await { + if err.kind() == std::io::ErrorKind::UnexpectedEof { + break + } else { + return Err(anyhow!(err).into()); + } + } + let len = u32::from_le_bytes(buf) as usize; + let mut buf = vec![0u8; len]; + agg_state_reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?; + let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?; + let chunk = DataChunk::from_protobuf(&chunk_pb)?; + // println!("agg state chunk = {}", chunk.to_pretty()); + + let group_key_indices = (0..self.group_key_columns.len()).collect_vec(); + let keys = K::build_many(&group_key_indices, &chunk); + let mut memory_usage_diff = 0; + for (row_id, key) in keys.into_iter().enumerate() { + let mut agg_states = vec![]; + for i in 0..self.aggs.len() { + let agg = &self.aggs[i]; + let datum = chunk.row_at(row_id).0.datum_at(self.group_key_columns.len() + i).to_owned_datum(); + let agg_state = agg.decode_state(datum)?; + memory_usage_diff += agg_state.estimated_size() as i64; + agg_states.push(agg_state); + } + groups.try_insert(key, agg_states).unwrap(); + } + + // if !self.mem_context.add(memory_usage_diff) { + // Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + // } + + } + } + + for mut reader in readers { + let mut buf = [0u8; 4]; + loop { + if let Err(err) = reader.read_exact(&mut buf).await { + if err.kind() == std::io::ErrorKind::UnexpectedEof { + break + } else { + return Err(anyhow!(err).into()); + } + } + let len = u32::from_le_bytes(buf) as usize; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?; + let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?; + let chunk = DataChunk::from_protobuf(&chunk_pb)?; + // println!("chunk = {}", chunk.to_pretty()); + + // Process chunk + let chunk = StreamChunk::from(chunk); + let keys = K::build_many(self.group_key_columns.as_slice(), &chunk); + let mut memory_usage_diff = 0; + for (row_id, (key, visible)) in keys + .into_iter() + .zip_eq_fast(chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } + let mut new_group = false; + let states = groups.entry(key).or_insert_with(|| { + new_group = true; + self.aggs.iter().map(|agg| agg.create_state()).collect() + }); + + // TODO: currently not a vectorized implementation + for (agg, state) in self.aggs.iter().zip_eq_fast(states) { + if !new_group { + memory_usage_diff -= state.estimated_size() as i64; + } + agg.update_range(state, &chunk, row_id..row_id + 1).await?; + memory_usage_diff += state.estimated_size() as i64; + } + } + // update memory usage + // if !self.mem_context.add(memory_usage_diff) { + // Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; + // } + + } } } diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index c136710edb23f..3a07763a64282 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -237,7 +237,7 @@ impl From for HashCode { } impl HashCode { - pub fn value(self) -> u64 { + pub fn value(&self) -> u64 { self.value } } diff --git a/src/expr/core/src/aggregate/mod.rs b/src/expr/core/src/aggregate/mod.rs index 4e8666e8c9dee..2b005983897ad 100644 --- a/src/expr/core/src/aggregate/mod.rs +++ b/src/expr/core/src/aggregate/mod.rs @@ -14,6 +14,7 @@ use std::fmt::Debug; use std::ops::Range; +use anyhow::anyhow; use downcast_rs::{impl_downcast, Downcast}; use itertools::Itertools; @@ -58,7 +59,7 @@ pub trait AggregateFunction: Send + Sync + 'static { fn encode_state(&self, state: &AggregateState) -> Result { match state { AggregateState::Datum(d) => Ok(d.clone()), - _ => panic!("cannot encode state"), + AggregateState::Any(_) => Err(ExprError::Internal(anyhow!("cannot encode state"))), } } From 0710380689c322ae31161801a32fa2bf13f1bf28 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 10 May 2024 15:07:30 +0800 Subject: [PATCH 02/18] refactor --- src/batch/Cargo.toml | 10 +- src/batch/src/executor/hash_agg.rs | 357 +++++++++++++++++------------ src/expr/core/src/aggregate/mod.rs | 2 +- 3 files changed, 221 insertions(+), 148 deletions(-) diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index e61cb348a132f..2ca8ed1be4e77 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -20,6 +20,7 @@ arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" +bytes = "1" either = "1" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } @@ -30,9 +31,11 @@ hytra = "0.1.2" icelake = { workspace = true } itertools = { workspace = true } memcomparable = "0.2" +opendal = "0.45.1" parking_lot = { workspace = true } paste = "1" prometheus = { version = "0.13", features = ["process"] } +prost = "0.12" rand = { workspace = true } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } @@ -48,11 +51,6 @@ scopeguard = "1" serde_json = "1" thiserror = "1" thiserror-ext = { workspace = true } -opendal = "0.45.1" -uuid = { version = "1", features = ["v4"] } -twox-hash = "1" -prost = "0.12" -bytes = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", @@ -67,6 +65,8 @@ tokio-stream = "0.1" tokio-util = { workspace = true } tonic = { workspace = true } tracing = "0.1" +twox-hash = "1" +uuid = { version = "1", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 870b3e65d0c59..8f1e5d04c5eca 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -12,27 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::hash::{BuildHasher, Hasher}; +use std::hash::BuildHasher; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; + use anyhow::anyhow; use bytes::Bytes; - use futures_async_stream::try_stream; use futures_util::AsyncReadExt; use itertools::Itertools; use opendal::layers::RetryLayer; -use opendal::Operator; use opendal::services::Fs; +use opendal::Operator; use prost::Message; -use twox_hash::XxHash64; use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::buffer::Bitmap; -use risingwave_pb::data::DataChunk as PbDataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; -use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; @@ -40,7 +38,8 @@ use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashAggNode; -use risingwave_common::row::RowExt; +use risingwave_pb::data::DataChunk as PbDataChunk; +use twox_hash::XxHash64; use crate::error::{BatchError, Result}; use crate::executor::aggregation::build as build_agg; @@ -234,9 +233,7 @@ pub fn new_spill_op(root: String) -> Result { let op: Operator = Operator::new(builder)? .layer(RetryLayer::default()) .finish(); - Ok(SpillOp { - op, - }) + Ok(SpillOp { op }) } pub struct SpillOp { @@ -245,7 +242,12 @@ pub struct SpillOp { impl SpillOp { pub async fn writer_with(&self, name: &str) -> Result { - Ok(self.op.writer_with(name).append(true).buffer(64 * 1024).await?) + Ok(self + .op + .writer_with(name) + .append(true) + .buffer(64 * 1024) + .await?) } pub async fn reader_with(&self, name: &str) -> Result { @@ -290,6 +292,163 @@ impl BuildHasher for SpillBuildHasher { } } +pub struct AggSpillManager { + op: SpillOp, + partition_num: usize, + agg_state_writers: Vec, + agg_state_readers: Vec, + agg_state_chunk_builder: Vec, + input_writers: Vec, + input_readers: Vec, + input_chunk_builders: Vec, + spill_build_hasher: SpillBuildHasher, + group_key_types: Vec, + child_data_types: Vec, + agg_data_types: Vec, + spill_chunk_size: usize, +} + +impl AggSpillManager { + pub fn new( + agg_identity: &String, + partition_num: usize, + group_key_types: Vec, + agg_data_types: Vec, + child_data_types: Vec, + spill_chunk_size: usize, + ) -> Result { + let suffix_uuid = uuid::Uuid::new_v4(); + let dir = format!("/tmp/rw_batch_spill-{}-{}/", agg_identity, suffix_uuid,); + let op = new_spill_op(dir)?; + let agg_state_writers = Vec::with_capacity(partition_num); + let agg_state_readers = Vec::with_capacity(partition_num); + let agg_state_chunk_builder = Vec::with_capacity(partition_num); + let input_writers = Vec::with_capacity(partition_num); + let input_readers = Vec::with_capacity(partition_num); + let input_chunk_builders = Vec::with_capacity(partition_num); + let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1); + Ok(Self { + op, + partition_num, + agg_state_writers, + agg_state_readers, + agg_state_chunk_builder, + input_writers, + input_readers, + input_chunk_builders, + spill_build_hasher, + group_key_types, + child_data_types, + agg_data_types, + spill_chunk_size, + }) + } + + pub async fn init_writers(&mut self) -> Result<()> { + for i in 0..self.partition_num { + let agg_state_partition_file_name = format!("agg-state-p{}", i); + let w = self.op.writer_with(&agg_state_partition_file_name).await?; + self.agg_state_writers.push(w); + + let partition_file_name = format!("input-chunks-p{}", i); + let w = self.op.writer_with(&partition_file_name).await?; + self.input_writers.push(w); + self.input_chunk_builders.push(DataChunkBuilder::new( + self.child_data_types.clone(), + self.spill_chunk_size, + )); + self.agg_state_chunk_builder.push(DataChunkBuilder::new( + self.group_key_types + .iter() + .cloned() + .chain(self.agg_data_types.iter().cloned()) + .collect(), + self.spill_chunk_size, + )); + } + Ok(()) + } + + pub async fn write_agg_state_row(&mut self, row: impl Row, hash_code: u64) -> Result<()> { + let partition = hash_code as usize % self.partition_num; + if let Some(output_chunk) = self.agg_state_chunk_builder[partition].append_one_row(row) { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + self.agg_state_writers[partition].write(len_bytes).await?; + self.agg_state_writers[partition].write(buf).await?; + } + Ok(()) + } + + pub async fn write_input_chunk( + &mut self, + chunk: DataChunk, + hash_codes: Vec, + ) -> Result<()> { + let (columns, vis) = chunk.into_parts_v2(); + for partition in 0..self.partition_num { + let new_vis = vis.clone() + & Bitmap::from_iter( + hash_codes + .iter() + .map(|hash_code| (*hash_code as usize % self.partition_num) == partition), + ); + let new_chunk = DataChunk::from_parts(columns.clone(), new_vis); + for output_chunk in self.input_chunk_builders[partition].append_chunk(new_chunk) { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + self.input_writers[partition].write(len_bytes).await?; + self.input_writers[partition].write(buf).await?; + } + } + Ok(()) + } + + pub async fn close_writers(&mut self) -> Result<()> { + for mut w in self.agg_state_writers.drain(..) { + w.close().await?; + } + for mut w in self.input_writers.drain(..) { + w.close().await?; + } + Ok(()) + } + + #[try_stream(boxed, ok = DataChunk, error = BatchError)] + async fn read_stream(mut reader: opendal::Reader) { + let mut buf = [0u8; 4]; + loop { + if let Err(err) = reader.read_exact(&mut buf).await { + if err.kind() == std::io::ErrorKind::UnexpectedEof { + break; + } else { + return Err(anyhow!(err).into()); + } + } + let len = u32::from_le_bytes(buf) as usize; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?; + let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?; + let chunk = DataChunk::from_protobuf(&chunk_pb)?; + yield chunk; + } + } + + async fn read_agg_state_partition(&mut self, partition: usize) -> Result { + let agg_state_partition_file_name = format!("agg-state-p{}", partition); + let r = self.op.reader_with(&agg_state_partition_file_name).await?; + Ok(Self::read_stream(r)) + } + + async fn read_input_partition(&mut self, partition: usize) -> Result { + let input_partition_file_name = format!("input-chunks-p{}", partition); + let r = self.op.reader_with(&input_partition_file_name).await?; + Ok(Self::read_stream(r)) + } +} + impl HashAggExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { @@ -346,34 +505,18 @@ impl HashAggExecutor { } if need_to_spill { - let suffix_uuid = uuid::Uuid::new_v4(); - let dir = format!("/tmp/rw_batch_spill-{}-{}/", &self.identity, suffix_uuid.to_string()); - let op = new_spill_op(dir.into())?; - let partition_num = 4; - let mut agg_state_writers = Vec::with_capacity(partition_num); - let mut agg_state_chunk_builder = Vec::with_capacity(partition_num); - let mut writers = Vec::with_capacity(partition_num); - let mut chunk_builders = Vec::with_capacity(partition_num); - let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1); - - for i in 0..partition_num { - let agg_state_partition_file_name = format!("agg-state-p{}", i); - let w = op.writer_with(&agg_state_partition_file_name).await?; - agg_state_writers.push(w); - - let partition_file_name = format!("input-chunks-p{}", i); - let w = op.writer_with(&partition_file_name).await?; - writers.push(w); - chunk_builders.push(DataChunkBuilder::new(child_schema.data_types(), 1024)); - agg_state_chunk_builder.push(DataChunkBuilder::new(self - .group_key_types - .iter().cloned().chain(self - .aggs - .iter().map(|agg| agg.return_type())).collect(), 1024)); - } + let mut agg_spill_manager = AggSpillManager::new( + &self.identity, + 4, + self.group_key_types.clone(), + self.aggs.iter().map(|agg| agg.return_type()).collect(), + child_schema.data_types(), + 1024, + )?; + agg_spill_manager.init_writers().await?; - // Spill the agg state let mut memory_usage_diff = 0; + // Spill agg states for (key, states) in groups { let key_row = key.deserialize(&self.group_key_types)?; let mut agg_datums = vec![]; @@ -383,104 +526,46 @@ impl HashAggExecutor { agg_datums.push(encode_state); } let agg_state_row = OwnedRow::from_iter(agg_datums.into_iter()); - let mut hasher = spill_build_hasher.build_hasher(); - key.hash(&mut hasher); - let hash_code = hasher.finish(); - let partition = hash_code as usize % partition_num; - if let Some(output_chunk) = agg_state_chunk_builder[partition].append_one_row(key_row.chain(agg_state_row)) { - let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); - let buf = Message::encode_to_vec(&chunk_pb); - let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); - agg_state_writers[partition].write(len_bytes).await?; - agg_state_writers[partition].write(buf).await?; - } - } - - for i in 0..partition_num { - if let Some(output_chunk) = agg_state_chunk_builder[i].consume_all() { - let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); - let buf = Message::encode_to_vec(&chunk_pb); - let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); - agg_state_writers[i].write(len_bytes).await?; - agg_state_writers[i].write(buf).await?; - } - } - - for mut w in agg_state_writers { - w.close().await?; + let hash_code = agg_spill_manager.spill_build_hasher.hash_one(key); + agg_spill_manager + .write_agg_state_row(key_row.chain(agg_state_row), hash_code) + .await?; } + // Release memory occupied by agg hash map self.mem_context.add(memory_usage_diff); - groups = AggHashMap::::with_hasher_in( PrecomputedBuildHasher, self.mem_context.global_allocator(), ); - + // Spill input chunks #[for_await] for chunk in input_stream { let chunk: DataChunk = chunk?; - let hash_codes = chunk.get_hash_values(self.group_key_columns.as_slice(), spill_build_hasher); - let (columns, vis) = chunk.into_parts_v2(); - for i in 0..partition_num { - let new_vis = vis.clone() & Bitmap::from_iter(hash_codes.iter().map(|hash_code| (hash_code.value() as usize % partition_num) == i)); - let new_chunk = DataChunk::from_parts(columns.clone(), new_vis); - for output_chunk in chunk_builders[i].append_chunk(new_chunk) { - let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); - let buf = Message::encode_to_vec(&chunk_pb); - let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); - writers[i].write(len_bytes).await?; - writers[i].write(buf).await?; - } - } - } - - for i in 0..partition_num { - if let Some(output_chunk) = chunk_builders[i].consume_all() { - let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); - let buf = Message::encode_to_vec(&chunk_pb); - let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); - writers[i].write(len_bytes).await?; - writers[i].write(buf).await?; - } - } - - for mut w in writers { - w.close().await?; - } - - let mut agg_state_readers = Vec::with_capacity(partition_num); - for i in 0..partition_num { - let agg_state_partition_file_name = format!("agg-state-p{}", i); - let r = op.reader_with(&agg_state_partition_file_name).await?; - agg_state_readers.push(r); + let hash_codes = chunk.get_hash_values( + self.group_key_columns.as_slice(), + agg_spill_manager.spill_build_hasher, + ); + agg_spill_manager + .write_input_chunk( + chunk, + hash_codes + .into_iter() + .map(|hash_code| hash_code.value()) + .collect(), + ) + .await?; } - let mut readers = Vec::with_capacity(partition_num); - for i in 0..partition_num { - let partition_file_name = format!("input-chunks-p{}", i); - let r = op.reader_with(&partition_file_name).await?; - readers.push(r); - } + agg_spill_manager.close_writers().await?; - for mut agg_state_reader in agg_state_readers { - let mut buf = [0u8; 4]; - loop { - if let Err(err) = agg_state_reader.read_exact(&mut buf).await { - if err.kind() == std::io::ErrorKind::UnexpectedEof { - break - } else { - return Err(anyhow!(err).into()); - } - } - let len = u32::from_le_bytes(buf) as usize; - let mut buf = vec![0u8; len]; - agg_state_reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?; - let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?; - let chunk = DataChunk::from_protobuf(&chunk_pb)?; - // println!("agg state chunk = {}", chunk.to_pretty()); + for i in 0..agg_spill_manager.partition_num { + let agg_state_stream = agg_spill_manager.read_agg_state_partition(i).await?; + #[for_await] + for chunk in agg_state_stream { + let chunk = chunk?; let group_key_indices = (0..self.group_key_columns.len()).collect_vec(); let keys = K::build_many(&group_key_indices, &chunk); let mut memory_usage_diff = 0; @@ -488,7 +573,11 @@ impl HashAggExecutor { let mut agg_states = vec![]; for i in 0..self.aggs.len() { let agg = &self.aggs[i]; - let datum = chunk.row_at(row_id).0.datum_at(self.group_key_columns.len() + i).to_owned_datum(); + let datum = chunk + .row_at(row_id) + .0 + .datum_at(self.group_key_columns.len() + i) + .to_owned_datum(); let agg_state = agg.decode_state(datum)?; memory_usage_diff += agg_state.estimated_size() as i64; agg_states.push(agg_state); @@ -499,28 +588,13 @@ impl HashAggExecutor { // if !self.mem_context.add(memory_usage_diff) { // Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; // } - } - } - for mut reader in readers { - let mut buf = [0u8; 4]; - loop { - if let Err(err) = reader.read_exact(&mut buf).await { - if err.kind() == std::io::ErrorKind::UnexpectedEof { - break - } else { - return Err(anyhow!(err).into()); - } - } - let len = u32::from_le_bytes(buf) as usize; - let mut buf = vec![0u8; len]; - reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?; - let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?; - let chunk = DataChunk::from_protobuf(&chunk_pb)?; - // println!("chunk = {}", chunk.to_pretty()); - - // Process chunk + let input_stream = agg_spill_manager.read_input_partition(i).await?; + + #[for_await] + for chunk in input_stream { + let chunk = chunk?; let chunk = StreamChunk::from(chunk); let keys = K::build_many(self.group_key_columns.as_slice(), &chunk); let mut memory_usage_diff = 0; @@ -551,7 +625,6 @@ impl HashAggExecutor { // if !self.mem_context.add(memory_usage_diff) { // Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; // } - } } } diff --git a/src/expr/core/src/aggregate/mod.rs b/src/expr/core/src/aggregate/mod.rs index 2b005983897ad..40bfb13119766 100644 --- a/src/expr/core/src/aggregate/mod.rs +++ b/src/expr/core/src/aggregate/mod.rs @@ -14,8 +14,8 @@ use std::fmt::Debug; use std::ops::Range; -use anyhow::anyhow; +use anyhow::anyhow; use downcast_rs::{impl_downcast, Downcast}; use itertools::Itertools; use risingwave_common::array::StreamChunk; From 04230e1dd5eeb014d1a92d77a464219d7be2001f Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 10 May 2024 16:21:16 +0800 Subject: [PATCH 03/18] recursive spill agg --- src/batch/src/executor/hash_agg.rs | 233 ++++++++++++++++------------- src/batch/src/executor/utils.rs | 25 ++++ 2 files changed, 150 insertions(+), 108 deletions(-) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 8f1e5d04c5eca..cdc793bdef993 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -15,6 +15,7 @@ use std::hash::BuildHasher; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use anyhow::anyhow; use bytes::Bytes; @@ -45,6 +46,7 @@ use crate::error::{BatchError, Result}; use crate::executor::aggregation::build as build_agg; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, + WrapStreamExecutor, }; use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; @@ -56,11 +58,12 @@ impl HashKeyDispatcher for HashAggExecutorBuilder { fn dispatch_impl(self) -> Self::Output { Box::new(HashAggExecutor::::new( - self.aggs, + Arc::new(self.aggs), self.group_key_columns, self.group_key_types, self.schema, self.child, + None, self.identity, self.chunk_size, self.mem_context, @@ -169,7 +172,7 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder { /// `HashAggExecutor` implements the hash aggregate algorithm. pub struct HashAggExecutor { /// Aggregate functions. - aggs: Vec, + aggs: Arc>, /// Column indexes that specify a group group_key_columns: Vec, /// Data types of group key columns @@ -177,6 +180,8 @@ pub struct HashAggExecutor { /// Output schema schema: Schema, child: BoxedExecutor, + /// Used to initialize the state of the aggregation from the spilled files. + init_agg_state_executor: Option, identity: String, chunk_size: usize, mem_context: MemoryContext, @@ -186,11 +191,12 @@ pub struct HashAggExecutor { impl HashAggExecutor { pub fn new( - aggs: Vec, + aggs: Arc>, group_key_columns: Vec, group_key_types: Vec, schema: Schema, child: BoxedExecutor, + init_agg_state_executor: Option, identity: String, chunk_size: usize, mem_context: MemoryContext, @@ -202,6 +208,7 @@ impl HashAggExecutor { group_key_types, schema, child, + init_agg_state_executor, identity, chunk_size, mem_context, @@ -407,6 +414,24 @@ impl AggSpillManager { } pub async fn close_writers(&mut self) -> Result<()> { + for partition in 0..self.partition_num { + if let Some(output_chunk) = self.agg_state_chunk_builder[partition].consume_all() { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + self.agg_state_writers[partition].write(len_bytes).await?; + self.agg_state_writers[partition].write(buf).await?; + } + + if let Some(output_chunk) = self.input_chunk_builders[partition].consume_all() { + let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); + let buf = Message::encode_to_vec(&chunk_pb); + let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); + self.input_writers[partition].write(len_bytes).await?; + self.input_writers[partition].write(buf).await?; + } + } + for mut w in self.agg_state_writers.drain(..) { w.close().await?; } @@ -452,16 +477,45 @@ impl AggSpillManager { impl HashAggExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { + let child_schema = self.child.schema().clone(); + let enable_spill = true; + let mut need_to_spill = false; + // hash map for each agg groups let mut groups = AggHashMap::::with_hasher_in( PrecomputedBuildHasher, self.mem_context.global_allocator(), ); - let child_schema = self.child.schema().clone(); + if let Some(init_agg_state_executor) = self.init_agg_state_executor { + let mut init_agg_state_stream = init_agg_state_executor.execute(); + #[for_await] + for chunk in &mut init_agg_state_stream { + let chunk = chunk?; + let group_key_indices = (0..self.group_key_columns.len()).collect_vec(); + let keys = K::build_many(&group_key_indices, &chunk); + let mut memory_usage_diff = 0; + for (row_id, key) in keys.into_iter().enumerate() { + let mut agg_states = vec![]; + for i in 0..self.aggs.len() { + let agg = &self.aggs[i]; + let datum = chunk + .row_at(row_id) + .0 + .datum_at(self.group_key_columns.len() + i) + .to_owned_datum(); + let agg_state = agg.decode_state(datum)?; + memory_usage_diff += agg_state.estimated_size() as i64; + agg_states.push(agg_state); + } + groups.try_insert(key, agg_states).unwrap(); + } - let enable_spill = true; - let mut need_to_spill = false; + if !self.mem_context.add(memory_usage_diff) { + warn!("not enough memory to load one partition agg state after spill which is not a normal case, so keep going"); + } + } + } let mut input_stream = self.child.execute(); // consume all chunks to compute the agg result @@ -534,10 +588,6 @@ impl HashAggExecutor { // Release memory occupied by agg hash map self.mem_context.add(memory_usage_diff); - groups = AggHashMap::::with_hasher_in( - PrecomputedBuildHasher, - self.mem_context.global_allocator(), - ); // Spill input chunks #[for_await] @@ -562,116 +612,83 @@ impl HashAggExecutor { for i in 0..agg_spill_manager.partition_num { let agg_state_stream = agg_spill_manager.read_agg_state_partition(i).await?; + let input_stream = agg_spill_manager.read_input_partition(i).await?; - #[for_await] - for chunk in agg_state_stream { - let chunk = chunk?; - let group_key_indices = (0..self.group_key_columns.len()).collect_vec(); - let keys = K::build_many(&group_key_indices, &chunk); - let mut memory_usage_diff = 0; - for (row_id, key) in keys.into_iter().enumerate() { - let mut agg_states = vec![]; - for i in 0..self.aggs.len() { - let agg = &self.aggs[i]; - let datum = chunk - .row_at(row_id) - .0 - .datum_at(self.group_key_columns.len() + i) - .to_owned_datum(); - let agg_state = agg.decode_state(datum)?; - memory_usage_diff += agg_state.estimated_size() as i64; - agg_states.push(agg_state); - } - groups.try_insert(key, agg_states).unwrap(); - } + let sub_hash_agg_executor: HashAggExecutor = HashAggExecutor::new( + self.aggs.clone(), + self.group_key_columns.clone(), + self.group_key_types.clone(), + self.schema.clone(), + Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)), + Some(Box::new(WrapStreamExecutor::new( + self.schema.clone(), + agg_state_stream, + ))), + format!("{}-sub{}", self.identity.clone(), i), + self.chunk_size, + self.mem_context.clone(), + self.shutdown_rx.clone(), + ); - // if !self.mem_context.add(memory_usage_diff) { - // Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; - // } - } + println!( + "create sub_hash_agg_executor {} for hash_agg {}", + sub_hash_agg_executor.identity, self.identity + ); - let input_stream = agg_spill_manager.read_input_partition(i).await?; + let sub_hash_agg_stream = Box::new(sub_hash_agg_executor).execute(); #[for_await] - for chunk in input_stream { + for chunk in sub_hash_agg_stream { let chunk = chunk?; - let chunk = StreamChunk::from(chunk); - let keys = K::build_many(self.group_key_columns.as_slice(), &chunk); - let mut memory_usage_diff = 0; - for (row_id, (key, visible)) in keys - .into_iter() - .zip_eq_fast(chunk.visibility().iter()) - .enumerate() - { - if !visible { - continue; - } - let mut new_group = false; - let states = groups.entry(key).or_insert_with(|| { - new_group = true; - self.aggs.iter().map(|agg| agg.create_state()).collect() - }); - - // TODO: currently not a vectorized implementation - for (agg, state) in self.aggs.iter().zip_eq_fast(states) { - if !new_group { - memory_usage_diff -= state.estimated_size() as i64; - } - agg.update_range(state, &chunk, row_id..row_id + 1).await?; - memory_usage_diff += state.estimated_size() as i64; - } - } - // update memory usage - // if !self.mem_context.add(memory_usage_diff) { - // Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?; - // } + yield chunk; } } - } + return Ok(()); + } else { + // Don't use `into_iter` here, it may cause memory leak. + let mut result = groups.iter_mut(); + let cardinality = self.chunk_size; + loop { + let mut group_builders: Vec<_> = self + .group_key_types + .iter() + .map(|datatype| datatype.create_array_builder(cardinality)) + .collect(); - // Don't use `into_iter` here, it may cause memory leak. - let mut result = groups.iter_mut(); - let cardinality = self.chunk_size; - loop { - let mut group_builders: Vec<_> = self - .group_key_types - .iter() - .map(|datatype| datatype.create_array_builder(cardinality)) - .collect(); - - let mut agg_builders: Vec<_> = self - .aggs - .iter() - .map(|agg| agg.return_type().create_array_builder(cardinality)) - .collect(); - - let mut has_next = false; - let mut array_len = 0; - for (key, states) in result.by_ref().take(cardinality) { - self.shutdown_rx.check()?; - has_next = true; - array_len += 1; - key.deserialize_to_builders(&mut group_builders[..], &self.group_key_types)?; - for ((agg, state), builder) in (self.aggs.iter()) - .zip_eq_fast(states) - .zip_eq_fast(&mut agg_builders) - { - let result = agg.get_result(state).await?; - builder.append(result); + let mut agg_builders: Vec<_> = self + .aggs + .iter() + .map(|agg| agg.return_type().create_array_builder(cardinality)) + .collect(); + + let mut has_next = false; + let mut array_len = 0; + for (key, states) in result.by_ref().take(cardinality) { + self.shutdown_rx.check()?; + has_next = true; + array_len += 1; + key.deserialize_to_builders(&mut group_builders[..], &self.group_key_types)?; + for ((agg, state), builder) in (self.aggs.iter()) + .zip_eq_fast(states) + .zip_eq_fast(&mut agg_builders) + { + let result = agg.get_result(state).await?; + builder.append(result); + } + } + if !has_next { + break; // exit loop } - } - if !has_next { - break; // exit loop - } - let columns = group_builders - .into_iter() - .chain(agg_builders) - .map(|b| b.finish().into()) - .collect::>(); + let columns = group_builders + .into_iter() + .chain(agg_builders) + .map(|b| b.finish().into()) + .collect::>(); - let output = DataChunk::new(columns, array_len); - yield output; + let output = DataChunk::new(columns, array_len); + yield output; + } } } } diff --git a/src/batch/src/executor/utils.rs b/src/batch/src/executor/utils.rs index 9c6f162f02268..4f724ec5416c8 100644 --- a/src/batch/src/executor/utils.rs +++ b/src/batch/src/executor/utils.rs @@ -99,3 +99,28 @@ impl DummyExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_nothing() {} } + +pub struct WrapStreamExecutor { + schema: Schema, + stream: BoxedDataChunkStream, +} + +impl WrapStreamExecutor { + pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self { + Self { schema, stream } + } +} + +impl Executor for WrapStreamExecutor { + fn schema(&self) -> &Schema { + &self.schema + } + + fn identity(&self) -> &str { + "WrapStreamExecutor" + } + + fn execute(self: Box) -> BoxedDataChunkStream { + self.stream + } +} From 9864614cc57e00808ee79a3cc7a61a30e95d71a1 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Sat, 11 May 2024 17:07:51 +0800 Subject: [PATCH 04/18] add RW_BATCH_SPILL_DIR --- src/batch/src/executor/hash_agg.rs | 40 ++++++++++++++++++++++++------ src/frontend/src/session.rs | 2 +- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index cdc793bdef993..0e84b6e071081 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -247,18 +247,28 @@ pub struct SpillOp { op: Operator, } +const DEFAULT_SPILL_CHUNK_SIZE: usize = 1024; +const DEFAULT_SPILL_PARTITION_NUM: usize = 20; +const DEFAULT_SPILL_DIR: &str = "/tmp/"; +const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/"; +const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024; + impl SpillOp { pub async fn writer_with(&self, name: &str) -> Result { Ok(self .op .writer_with(name) .append(true) - .buffer(64 * 1024) + .buffer(DEFAULT_IO_BUFFER_SIZE) .await?) } pub async fn reader_with(&self, name: &str) -> Result { - Ok(self.op.reader_with(name).buffer(64 * 1024).await?) + Ok(self + .op + .reader_with(name) + .buffer(DEFAULT_IO_BUFFER_SIZE) + .await?) } } @@ -325,7 +335,12 @@ impl AggSpillManager { spill_chunk_size: usize, ) -> Result { let suffix_uuid = uuid::Uuid::new_v4(); - let dir = format!("/tmp/rw_batch_spill-{}-{}/", agg_identity, suffix_uuid,); + let spill_dir = + std::env::var("RW_BATCH_SPILL_DIR").unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string()); + let dir = format!( + "/{}/{}/{}-{}/", + spill_dir, RW_MANAGED_SPILL_DIR, agg_identity, suffix_uuid + ); let op = new_spill_op(dir)?; let agg_state_writers = Vec::with_capacity(partition_num); let agg_state_readers = Vec::with_capacity(partition_num); @@ -472,6 +487,14 @@ impl AggSpillManager { let r = self.op.reader_with(&input_partition_file_name).await?; Ok(Self::read_stream(r)) } + + async fn clear_partition(&mut self, partition: usize) -> Result<()> { + let agg_state_partition_file_name = format!("agg-state-p{}", partition); + self.op.delete(&agg_state_partition_file_name).await?; + let input_partition_file_name = format!("input-chunks-p{}", partition); + self.op.delete(&input_partition_file_name).await?; + Ok(()) + } } impl HashAggExecutor { @@ -561,11 +584,11 @@ impl HashAggExecutor { if need_to_spill { let mut agg_spill_manager = AggSpillManager::new( &self.identity, - 4, + DEFAULT_SPILL_PARTITION_NUM, self.group_key_types.clone(), self.aggs.iter().map(|agg| agg.return_type()).collect(), child_schema.data_types(), - 1024, + DEFAULT_SPILL_CHUNK_SIZE, )?; agg_spill_manager.init_writers().await?; @@ -630,8 +653,8 @@ impl HashAggExecutor { self.shutdown_rx.clone(), ); - println!( - "create sub_hash_agg_executor {} for hash_agg {}", + debug!( + "create sub_hash_agg {} for hash_agg {} to spill", sub_hash_agg_executor.identity, self.identity ); @@ -642,8 +665,9 @@ impl HashAggExecutor { let chunk = chunk?; yield chunk; } + + agg_spill_manager.clear_partition(i).await?; } - return Ok(()); } else { // Don't use `into_iter` here, it may cause memory leak. let mut result = groups.iter_mut(); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 071608ac2f5e2..c18cc1403717f 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -165,7 +165,7 @@ pub struct FrontendEnv { type SessionMapRef = Arc>>>; /// The proportion of frontend memory used for batch processing. -const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5; +const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.0005; impl FrontendEnv { pub fn mock() -> Self { From c3338268d3f0c81ca559e7cc866c874d16e09747 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 13 May 2024 14:40:33 +0800 Subject: [PATCH 05/18] revert FRONTEND_BATCH_MEMORY_PROPORTION --- src/frontend/src/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index c18cc1403717f..071608ac2f5e2 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -165,7 +165,7 @@ pub struct FrontendEnv { type SessionMapRef = Arc>>>; /// The proportion of frontend memory used for batch processing. -const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.0005; +const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5; impl FrontendEnv { pub fn mock() -> Self { From af561b0dcccc7a69b045210b3c2f9cc6e2614564 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 14 May 2024 14:35:57 +0800 Subject: [PATCH 06/18] refactor --- src/batch/src/executor/hash_agg.rs | 82 ++------------------------ src/batch/src/lib.rs | 1 + src/batch/src/spill/mod.rs | 15 +++++ src/batch/src/spill/spill_op.rs | 93 ++++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 77 deletions(-) create mode 100644 src/batch/src/spill/mod.rs create mode 100644 src/batch/src/spill/spill_op.rs diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 0e84b6e071081..603c98844abda 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -14,7 +14,6 @@ use std::hash::BuildHasher; use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; use std::sync::Arc; use anyhow::anyhow; @@ -22,9 +21,6 @@ use bytes::Bytes; use futures_async_stream::try_stream; use futures_util::AsyncReadExt; use itertools::Itertools; -use opendal::layers::RetryLayer; -use opendal::services::Fs; -use opendal::Operator; use prost::Message; use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -48,6 +44,7 @@ use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, WrapStreamExecutor, }; +use crate::spill::spill_op::{SpillOp, DEFAULT_SPILL_PARTITION_NUM}; use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; type AggHashMap = hashbrown::HashMap, PrecomputedBuildHasher, A>; @@ -232,72 +229,6 @@ impl Executor for HashAggExecutor { } } -pub fn new_spill_op(root: String) -> Result { - assert!(root.ends_with('/')); - let mut builder = Fs::default(); - builder.root(&root); - - let op: Operator = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - Ok(SpillOp { op }) -} - -pub struct SpillOp { - op: Operator, -} - -const DEFAULT_SPILL_CHUNK_SIZE: usize = 1024; -const DEFAULT_SPILL_PARTITION_NUM: usize = 20; -const DEFAULT_SPILL_DIR: &str = "/tmp/"; -const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/"; -const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024; - -impl SpillOp { - pub async fn writer_with(&self, name: &str) -> Result { - Ok(self - .op - .writer_with(name) - .append(true) - .buffer(DEFAULT_IO_BUFFER_SIZE) - .await?) - } - - pub async fn reader_with(&self, name: &str) -> Result { - Ok(self - .op - .reader_with(name) - .buffer(DEFAULT_IO_BUFFER_SIZE) - .await?) - } -} - -impl Drop for SpillOp { - fn drop(&mut self) { - let op = self.op.clone(); - tokio::task::spawn(async move { - let result = op.remove_all("/").await; - if let Err(e) = result { - error!("Failed to remove spill directory: {}", e); - } - }); - } -} - -impl DerefMut for SpillOp { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.op - } -} - -impl Deref for SpillOp { - type Target = Operator; - - fn deref(&self) -> &Self::Target { - &self.op - } -} - #[derive(Default, Clone, Copy)] pub struct SpillBuildHasher(u64); @@ -309,6 +240,8 @@ impl BuildHasher for SpillBuildHasher { } } +const DEFAULT_SPILL_CHUNK_SIZE: usize = 1024; + pub struct AggSpillManager { op: SpillOp, partition_num: usize, @@ -335,13 +268,8 @@ impl AggSpillManager { spill_chunk_size: usize, ) -> Result { let suffix_uuid = uuid::Uuid::new_v4(); - let spill_dir = - std::env::var("RW_BATCH_SPILL_DIR").unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string()); - let dir = format!( - "/{}/{}/{}-{}/", - spill_dir, RW_MANAGED_SPILL_DIR, agg_identity, suffix_uuid - ); - let op = new_spill_op(dir)?; + let dir = format!("/{}-{}/", agg_identity, suffix_uuid); + let op = SpillOp::create(dir)?; let agg_state_writers = Vec::with_capacity(partition_num); let agg_state_readers = Vec::with_capacity(partition_num); let agg_state_chunk_builder = Vec::with_capacity(partition_num); diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index d937c64826550..422190eb1c044 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -41,6 +41,7 @@ pub mod execution; pub mod executor; pub mod monitor; pub mod rpc; +mod spill; pub mod task; pub mod worker_manager; diff --git a/src/batch/src/spill/mod.rs b/src/batch/src/spill/mod.rs new file mode 100644 index 0000000000000..6af1eae7429c6 --- /dev/null +++ b/src/batch/src/spill/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod spill_op; diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs new file mode 100644 index 0000000000000..4e3f41a88acb4 --- /dev/null +++ b/src/batch/src/spill/spill_op.rs @@ -0,0 +1,93 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::{Deref, DerefMut}; +use std::result::Result::{Err, Ok}; + +use opendal::layers::RetryLayer; +use opendal::services::Fs; +use opendal::Operator; + +use crate::error::Result; + +const RW_BATCH_SPILL_DIR_ENV: &str = "RW_BATCH_SPILL_DIR"; +pub const DEFAULT_SPILL_PARTITION_NUM: usize = 20; +const DEFAULT_SPILL_DIR: &str = "/tmp/"; +const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/"; +const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024; + +pub struct SpillOp { + pub op: Operator, +} + +impl SpillOp { + pub fn create(path: String) -> Result { + assert!(path.ends_with('/')); + + let spill_dir = + std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string()); + let root = format!("/{}/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR, path); + + let mut builder = Fs::default(); + builder.root(&root); + + let op: Operator = Operator::new(builder)? + .layer(RetryLayer::default()) + .finish(); + Ok(SpillOp { op }) + } + + pub async fn writer_with(&self, name: &str) -> Result { + Ok(self + .op + .writer_with(name) + .append(true) + .buffer(DEFAULT_IO_BUFFER_SIZE) + .await?) + } + + pub async fn reader_with(&self, name: &str) -> Result { + Ok(self + .op + .reader_with(name) + .buffer(DEFAULT_IO_BUFFER_SIZE) + .await?) + } +} + +impl Drop for SpillOp { + fn drop(&mut self) { + let op = self.op.clone(); + tokio::task::spawn(async move { + let result = op.remove_all("/").await; + if let Err(e) = result { + error!("Failed to remove spill directory: {}", e); + } + }); + } +} + +impl DerefMut for SpillOp { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.op + } +} + +impl Deref for SpillOp { + type Target = Operator; + + fn deref(&self) -> &Self::Target { + &self.op + } +} From b16e39b9bc2aaecd92d8f61f762f98870a89d6ab Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 14 May 2024 15:00:29 +0800 Subject: [PATCH 07/18] add enable_spill --- src/batch/src/executor/hash_agg.rs | 21 ++++++++++++++++----- src/common/src/config.rs | 7 +++++++ src/config/example.toml | 1 + 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 603c98844abda..e27625aa66353 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -64,6 +64,7 @@ impl HashKeyDispatcher for HashAggExecutorBuilder { self.identity, self.chunk_size, self.mem_context, + self.enable_spill, self.shutdown_rx, )) } @@ -83,6 +84,7 @@ pub struct HashAggExecutorBuilder { identity: String, chunk_size: usize, mem_context: MemoryContext, + enable_spill: bool, shutdown_rx: ShutdownToken, } @@ -94,6 +96,7 @@ impl HashAggExecutorBuilder { identity: String, chunk_size: usize, mem_context: MemoryContext, + enable_spill: bool, shutdown_rx: ShutdownToken, ) -> Result { let aggs: Vec<_> = hash_agg_node @@ -132,6 +135,7 @@ impl HashAggExecutorBuilder { identity, chunk_size, mem_context, + enable_spill, shutdown_rx, }; @@ -161,6 +165,7 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder { identity.clone(), source.context.get_config().developer.chunk_size, source.context.create_executor_mem_context(identity), + source.context.get_config().enable_spill, source.shutdown_rx.clone(), ) } @@ -182,11 +187,13 @@ pub struct HashAggExecutor { identity: String, chunk_size: usize, mem_context: MemoryContext, + enable_spill: bool, shutdown_rx: ShutdownToken, _phantom: PhantomData, } impl HashAggExecutor { + #[allow(clippy::too_many_arguments)] pub fn new( aggs: Arc>, group_key_columns: Vec, @@ -197,6 +204,7 @@ impl HashAggExecutor { identity: String, chunk_size: usize, mem_context: MemoryContext, + enable_spill: bool, shutdown_rx: ShutdownToken, ) -> Self { HashAggExecutor { @@ -209,6 +217,7 @@ impl HashAggExecutor { identity, chunk_size, mem_context, + enable_spill, shutdown_rx, _phantom: PhantomData, } @@ -429,7 +438,6 @@ impl HashAggExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { let child_schema = self.child.schema().clone(); - let enable_spill = true; let mut need_to_spill = false; // hash map for each agg groups @@ -500,7 +508,7 @@ impl HashAggExecutor { } // update memory usage if !self.mem_context.add(memory_usage_diff) { - if enable_spill { + if self.enable_spill { need_to_spill = true; break; } else { @@ -521,7 +529,7 @@ impl HashAggExecutor { agg_spill_manager.init_writers().await?; let mut memory_usage_diff = 0; - // Spill agg states + // Spill agg states. for (key, states) in groups { let key_row = key.deserialize(&self.group_key_types)?; let mut agg_datums = vec![]; @@ -537,10 +545,10 @@ impl HashAggExecutor { .await?; } - // Release memory occupied by agg hash map + // Release memory occupied by agg hash map. self.mem_context.add(memory_usage_diff); - // Spill input chunks + // Spill input chunks. #[for_await] for chunk in input_stream { let chunk: DataChunk = chunk?; @@ -561,6 +569,7 @@ impl HashAggExecutor { agg_spill_manager.close_writers().await?; + // Process each partition one by one. for i in 0..agg_spill_manager.partition_num { let agg_state_stream = agg_spill_manager.read_agg_state_partition(i).await?; let input_stream = agg_spill_manager.read_input_partition(i).await?; @@ -578,6 +587,7 @@ impl HashAggExecutor { format!("{}-sub{}", self.identity.clone(), i), self.chunk_size, self.mem_context.clone(), + self.enable_spill, self.shutdown_rx.clone(), ); @@ -594,6 +604,7 @@ impl HashAggExecutor { yield chunk; } + // Clear files of the current partition. agg_spill_manager.clear_partition(i).await?; } } else { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 2aaff2f500c39..98238695f3f26 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -509,6 +509,9 @@ pub struct BatchConfig { /// This is the secs used to mask a worker unavailable temporarily. #[serde(default = "default::batch::mask_worker_temporary_secs")] pub mask_worker_temporary_secs: usize, + + #[serde(default = "default::batch::enable_spill")] + pub enable_spill: bool, } /// The section `[streaming]` in `risingwave.toml`. @@ -1665,6 +1668,10 @@ pub mod default { false } + pub fn enable_spill() -> bool { + true + } + pub fn statement_timeout_in_sec() -> u32 { // 1 hour 60 * 60 diff --git a/src/config/example.toml b/src/config/example.toml index abdb1e77353fe..13fe589b6147b 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -83,6 +83,7 @@ enable_barrier_read = false statement_timeout_in_sec = 3600 frontend_compute_runtime_worker_threads = 4 mask_worker_temporary_secs = 30 +enable_spill = true [batch.developer] batch_connector_message_buffer_size = 16 From 0f0d2cf6810dfe6813675df4c20c349329daf72c Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 15 May 2024 18:16:23 +0800 Subject: [PATCH 08/18] add comments --- src/batch/src/executor/hash_agg.rs | 24 ++++++++++++++++++++++++ src/batch/src/spill/spill_op.rs | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 04a774fb8d3f1..1d3798c0d40b8 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -251,6 +251,20 @@ impl BuildHasher for SpillBuildHasher { const DEFAULT_SPILL_CHUNK_SIZE: usize = 1024; +/// `AggSpillManager` is used to manage how to write spill data file and read them back. +/// The spill data first need to be partitioned. Each partition contains 2 files: `agg_state_file` and `input_chunks_file`. +/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes. +/// Finally, spill file content will look like the below. +/// The file write pattern is append-only and the read pattern is sequential scan. +/// This can maximize the disk IO performance. +/// +/// ```text +/// [proto_len] +/// [proto_bytes] +/// ... +/// [proto_len] +/// [proto_bytes] +/// ``` pub struct AggSpillManager { op: SpillOp, partition_num: usize, @@ -285,6 +299,7 @@ impl AggSpillManager { let input_writers = Vec::with_capacity(partition_num); let input_readers = Vec::with_capacity(partition_num); let input_chunk_builders = Vec::with_capacity(partition_num); + // Use uuid to generate an unique hasher so that when recursive spilling happens they would use a different hasher to avoid data skew. let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1); Ok(Self { op, @@ -447,6 +462,8 @@ impl HashAggExecutor { ); if let Some(init_agg_state_executor) = self.init_agg_state_executor { + // `init_agg_state_executor` exists which means this is a sub `HashAggExecutor` used to consume spilling data. + // The spilled agg states by its parent executor need to be recovered first. let mut init_agg_state_stream = init_agg_state_executor.execute(); #[for_await] for chunk in &mut init_agg_state_stream { @@ -518,6 +535,13 @@ impl HashAggExecutor { } if need_to_spill { + // A spilling version of aggregation based on the RFC: Spill Hash Aggregation https://github.com/risingwavelabs/rfcs/pull/89 + // When HashAggExecutor told memory is insufficient, AggSpillManager will start to partition the hash table and spill to disk. + // After spilling the hash table, AggSpillManager will consume all chunks from the input executor, + // partition and spill to disk with the same hash function as the hash table spilling. + // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original hash table and input data. + // A sub HashAggExecutor would be used to consume each partition one by one. + // If memory is still not enough in the sub HashAggExecutor, it will partition its hash table and input recursively. let mut agg_spill_manager = AggSpillManager::new( &self.identity, DEFAULT_SPILL_PARTITION_NUM, diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 4e3f41a88acb4..45bad3014bbb2 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::ops::{Deref, DerefMut}; -use std::result::Result::{Err, Ok}; use opendal::layers::RetryLayer; use opendal::services::Fs; @@ -27,6 +26,7 @@ const DEFAULT_SPILL_DIR: &str = "/tmp/"; const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/"; const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024; +/// `SpillOp` is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style. pub struct SpillOp { pub op: Operator, } From 3834564fdc3d2f6b1690eba0ce1ce1b3f4f3504c Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 15 May 2024 18:22:55 +0800 Subject: [PATCH 09/18] better error fmt --- src/batch/src/spill/spill_op.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 45bad3014bbb2..6c521127042bb 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -17,6 +17,7 @@ use std::ops::{Deref, DerefMut}; use opendal::layers::RetryLayer; use opendal::services::Fs; use opendal::Operator; +use thiserror_ext::AsReport; use crate::error::Result; @@ -71,8 +72,11 @@ impl Drop for SpillOp { let op = self.op.clone(); tokio::task::spawn(async move { let result = op.remove_all("/").await; - if let Err(e) = result { - error!("Failed to remove spill directory: {}", e); + if let Err(error) = result { + error!( + error = %error.as_report(), + "Failed to remove spill directory" + ); } }); } From 757c1f7839973cfd158a4e4234cadf94ce8f86d6 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 15 May 2024 18:47:48 +0800 Subject: [PATCH 10/18] fix tests --- src/batch/benches/hash_agg.rs | 1 + src/batch/src/executor/hash_agg.rs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/batch/benches/hash_agg.rs b/src/batch/benches/hash_agg.rs index a918541beea18..0a15bd368e9a8 100644 --- a/src/batch/benches/hash_agg.rs +++ b/src/batch/benches/hash_agg.rs @@ -100,6 +100,7 @@ fn create_hash_agg_executor( group_key_types, schema, input, + None, "HashAggExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 1d3798c0d40b8..ee9be5147f05e 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -685,7 +685,6 @@ mod tests { use std::alloc::{AllocError, Allocator, Global, Layout}; use std::ptr::NonNull; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; use futures_async_stream::for_await; use risingwave_common::metrics::LabelGuardedIntGauge; @@ -829,6 +828,7 @@ mod tests { "HashAggExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, ShutdownToken::empty(), ) .unwrap(); @@ -943,6 +943,7 @@ mod tests { "HashAggExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, shutdown_rx, ) .unwrap(); From dee02d236e3964c992d6d5903f3991e902185db7 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 May 2024 11:24:23 +0800 Subject: [PATCH 11/18] fix test and add io concurrent --- src/batch/benches/hash_agg.rs | 5 ++++- src/batch/src/executor/hash_agg.rs | 4 +++- src/batch/src/spill/spill_op.rs | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/batch/benches/hash_agg.rs b/src/batch/benches/hash_agg.rs index 0a15bd368e9a8..cd83d6ce3209e 100644 --- a/src/batch/benches/hash_agg.rs +++ b/src/batch/benches/hash_agg.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod utils; +use std::sync::Arc; + use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; use itertools::Itertools; use risingwave_batch::executor::aggregation::build as build_agg; @@ -95,7 +97,7 @@ fn create_hash_agg_executor( let schema = Schema { fields }; Box::new(HashAggExecutor::::new( - agg_init_states, + Arc::new(agg_init_states), group_key_columns, group_key_types, schema, @@ -104,6 +106,7 @@ fn create_hash_agg_executor( "HashAggExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(), + false, ShutdownToken::empty(), )) } diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index ee9be5147f05e..e9dcb20f76e0f 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -19,7 +19,8 @@ use std::sync::Arc; use anyhow::anyhow; use bytes::Bytes; use futures_async_stream::try_stream; -use futures_util::AsyncReadExt; +use futures_util::future::try_join_all; +use futures_util::{AsyncReadExt, StreamExt}; use itertools::Itertools; use prost::Message; use risingwave_common::array::{DataChunk, StreamChunk}; @@ -757,6 +758,7 @@ mod tests { "HashAggExecutor".to_string(), CHUNK_SIZE, mem_context.clone(), + false, ShutdownToken::empty(), ) .unwrap(); diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 6c521127042bb..07511ab73f9ab 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -26,6 +26,7 @@ pub const DEFAULT_SPILL_PARTITION_NUM: usize = 20; const DEFAULT_SPILL_DIR: &str = "/tmp/"; const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/"; const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024; +const DEFAULT_IO_CONCURRENT_TASK: usize = 8; /// `SpillOp` is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style. pub struct SpillOp { @@ -55,6 +56,7 @@ impl SpillOp { .writer_with(name) .append(true) .buffer(DEFAULT_IO_BUFFER_SIZE) + .concurrent(DEFAULT_IO_CONCURRENT_TASK) .await?) } From 637ec5f131ac2d0b59ad8b58c29212e3ea65604d Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 May 2024 11:25:45 +0800 Subject: [PATCH 12/18] fmt --- src/batch/src/executor/hash_agg.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index e9dcb20f76e0f..805f25f6d1e43 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -19,8 +19,7 @@ use std::sync::Arc; use anyhow::anyhow; use bytes::Bytes; use futures_async_stream::try_stream; -use futures_util::future::try_join_all; -use futures_util::{AsyncReadExt, StreamExt}; +use futures_util::AsyncReadExt; use itertools::Itertools; use prost::Message; use risingwave_common::array::{DataChunk, StreamChunk}; From 5e8caa6096b96972e2f6443a964618643a2eeac4 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 May 2024 12:03:30 +0800 Subject: [PATCH 13/18] fix test --- src/config/docs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/src/config/docs.md b/src/config/docs.md index d8f551f96fdf4..17b54fbc0f120 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -8,6 +8,7 @@ This page is automatically generated by `./risedev generate-example-config` |--------|-------------|---------| | distributed_query_limit | This is the max number of queries per sql session. | | | enable_barrier_read | | false | +| enable_spill | | true | | frontend_compute_runtime_worker_threads | frontend compute runtime worker threads | 4 | | mask_worker_temporary_secs | This is the secs used to mask a worker unavailable temporarily. | 30 | | max_batch_queries_per_frontend_node | This is the max number of batch queries per frontend node. | | From d8badc630fb98095cd94541b333b86bb5664f4d7 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 29 May 2024 18:24:09 +0800 Subject: [PATCH 14/18] refine based on the review --- src/batch/src/error.rs | 2 +- src/batch/src/executor/hash_agg.rs | 78 +++++++++++++++++++----------- src/common/src/config.rs | 1 + 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index dbf981ccc32ff..27f355aed48b3 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -140,7 +140,7 @@ pub enum BatchError { #[error("Not enough memory to run this query, batch memory limit is {0} bytes")] OutOfMemory(u64), - #[error(transparent)] + #[error("Failed to spill out to disk")] Spill( #[from] #[backtrace] diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index e50acbf477a71..75f86bd97a537 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -61,7 +61,6 @@ impl HashKeyDispatcher for HashAggExecutorBuilder { self.group_key_types, self.schema, self.child, - None, self.identity, self.chunk_size, self.mem_context, @@ -194,8 +193,34 @@ pub struct HashAggExecutor { } impl HashAggExecutor { - #[allow(clippy::too_many_arguments)] pub fn new( + aggs: Arc>, + group_key_columns: Vec, + group_key_types: Vec, + schema: Schema, + child: BoxedExecutor, + identity: String, + chunk_size: usize, + mem_context: MemoryContext, + enable_spill: bool, + shutdown_rx: ShutdownToken, + ) -> Self { + Self::new_with_init_agg_state( + aggs, + group_key_columns, + group_key_types, + schema, + child, + None, + identity, + chunk_size, + mem_context, + enable_spill, + shutdown_rx, + ) + } + + fn new_with_init_agg_state( aggs: Arc>, group_key_columns: Vec, group_key_types: Vec, @@ -250,8 +275,6 @@ impl BuildHasher for SpillBuildHasher { } } -const DEFAULT_SPILL_CHUNK_SIZE: usize = 1024; - /// `AggSpillManager` is used to manage how to write spill data file and read them back. /// The spill data first need to be partitioned. Each partition contains 2 files: `agg_state_file` and `input_chunks_file`. /// The spill file consume a data chunk and serialize the chunk into a protobuf bytes. @@ -283,7 +306,7 @@ pub struct AggSpillManager { } impl AggSpillManager { - pub fn new( + fn new( agg_identity: &String, partition_num: usize, group_key_types: Vec, @@ -319,7 +342,7 @@ impl AggSpillManager { }) } - pub async fn init_writers(&mut self) -> Result<()> { + async fn init_writers(&mut self) -> Result<()> { for i in 0..self.partition_num { let agg_state_partition_file_name = format!("agg-state-p{}", i); let w = self.op.writer_with(&agg_state_partition_file_name).await?; @@ -344,7 +367,7 @@ impl AggSpillManager { Ok(()) } - pub async fn write_agg_state_row(&mut self, row: impl Row, hash_code: u64) -> Result<()> { + async fn write_agg_state_row(&mut self, row: impl Row, hash_code: u64) -> Result<()> { let partition = hash_code as usize % self.partition_num; if let Some(output_chunk) = self.agg_state_chunk_builder[partition].append_one_row(row) { let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); @@ -356,11 +379,7 @@ impl AggSpillManager { Ok(()) } - pub async fn write_input_chunk( - &mut self, - chunk: DataChunk, - hash_codes: Vec, - ) -> Result<()> { + async fn write_input_chunk(&mut self, chunk: DataChunk, hash_codes: Vec) -> Result<()> { let (columns, vis) = chunk.into_parts_v2(); for partition in 0..self.partition_num { let new_vis = vis.clone() @@ -381,7 +400,7 @@ impl AggSpillManager { Ok(()) } - pub async fn close_writers(&mut self) -> Result<()> { + async fn close_writers(&mut self) -> Result<()> { for partition in 0..self.partition_num { if let Some(output_chunk) = self.agg_state_chunk_builder[partition].consume_all() { let chunk_pb: PbDataChunk = output_chunk.to_protobuf(); @@ -557,7 +576,7 @@ impl HashAggExecutor { self.group_key_types.clone(), self.aggs.iter().map(|agg| agg.return_type()).collect(), child_schema.data_types(), - DEFAULT_SPILL_CHUNK_SIZE, + self.chunk_size, )?; agg_spill_manager.init_writers().await?; @@ -607,22 +626,23 @@ impl HashAggExecutor { let agg_state_stream = agg_spill_manager.read_agg_state_partition(i).await?; let input_stream = agg_spill_manager.read_input_partition(i).await?; - let sub_hash_agg_executor: HashAggExecutor = HashAggExecutor::new( - self.aggs.clone(), - self.group_key_columns.clone(), - self.group_key_types.clone(), - self.schema.clone(), - Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)), - Some(Box::new(WrapStreamExecutor::new( + let sub_hash_agg_executor: HashAggExecutor = + HashAggExecutor::new_with_init_agg_state( + self.aggs.clone(), + self.group_key_columns.clone(), + self.group_key_types.clone(), self.schema.clone(), - agg_state_stream, - ))), - format!("{}-sub{}", self.identity.clone(), i), - self.chunk_size, - self.mem_context.clone(), - self.enable_spill, - self.shutdown_rx.clone(), - ); + Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)), + Some(Box::new(WrapStreamExecutor::new( + self.schema.clone(), + agg_state_stream, + ))), + format!("{}-sub{}", self.identity.clone(), i), + self.chunk_size, + self.mem_context.clone(), + self.enable_spill, + self.shutdown_rx.clone(), + ); debug!( "create sub_hash_agg {} for hash_agg {} to spill", diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 6329d290863a8..c8da0f6dce5e9 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -536,6 +536,7 @@ pub struct BatchConfig { #[serde(default = "default::batch::redact_sql_option_keywords")] pub redact_sql_option_keywords: Vec, + /// Enable the spill out to disk feature for batch queries. #[serde(default = "default::batch::enable_spill")] pub enable_spill: bool, } From 62bc4d670ab19ba6b8dc1532ea052d2dbe3afe01 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 29 May 2024 18:34:00 +0800 Subject: [PATCH 15/18] doc --- src/config/docs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config/docs.md b/src/config/docs.md index 3175f5a0c7486..018c9dd41087c 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -8,7 +8,7 @@ This page is automatically generated by `./risedev generate-example-config` |--------|-------------|---------| | distributed_query_limit | This is the max number of queries per sql session. | | | enable_barrier_read | | false | -| enable_spill | | true | +| enable_spill | Enable the spill out to disk feature for batch queries. | true | | frontend_compute_runtime_worker_threads | frontend compute runtime worker threads | 4 | | mask_worker_temporary_secs | This is the secs used to mask a worker unavailable temporarily. | 30 | | max_batch_queries_per_frontend_node | This is the max number of batch queries per frontend node. | | From 3952fc89b9bc4796c5122298b8688b0023fd0ddd Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 29 May 2024 18:39:47 +0800 Subject: [PATCH 16/18] remove append --- src/batch/src/spill/spill_op.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 07511ab73f9ab..115a0c2d430e1 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -54,7 +54,6 @@ impl SpillOp { Ok(self .op .writer_with(name) - .append(true) .buffer(DEFAULT_IO_BUFFER_SIZE) .concurrent(DEFAULT_IO_CONCURRENT_TASK) .await?) From a5ad9d21791b43178f234808e1b1bd4d21c9ca28 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 29 May 2024 18:52:16 +0800 Subject: [PATCH 17/18] fmt --- src/batch/src/executor/hash_agg.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 75f86bd97a537..cb4adcecdc8c7 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -220,6 +220,7 @@ impl HashAggExecutor { ) } + #[allow(clippy::too_many_arguments)] fn new_with_init_agg_state( aggs: Arc>, group_key_columns: Vec, From 30c5cc12f8aab3e3bf121870c8ae935de5911dab Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 29 May 2024 19:05:40 +0800 Subject: [PATCH 18/18] fix --- src/batch/benches/hash_agg.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/batch/benches/hash_agg.rs b/src/batch/benches/hash_agg.rs index 46961bc6663aa..e91564692dc95 100644 --- a/src/batch/benches/hash_agg.rs +++ b/src/batch/benches/hash_agg.rs @@ -103,7 +103,6 @@ fn create_hash_agg_executor( group_key_types, schema, input, - None, "HashAggExecutor".to_string(), CHUNK_SIZE, MemoryContext::none(),