Skip to content

Commit

Permalink
feat(batch): support spill hash join (#17122)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jun 5, 2024
1 parent e4f1c48 commit 030f2fa
Show file tree
Hide file tree
Showing 4 changed files with 565 additions and 134 deletions.
3 changes: 2 additions & 1 deletion src/batch/benches/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn create_hash_join_executor(
_ => vec![0, 1],
};

let cond = with_cond.then(|| build_from_pretty("(greater_than:int8 $0:int8 100:int8)"));
let cond = with_cond.then(|| build_from_pretty("(greater_than:int8 $0:int8 100:int8)").into());

Box::new(HashJoinExecutor::<hash::Key64>::new(
join_type,
Expand All @@ -74,6 +74,7 @@ fn create_hash_join_executor(
cond,
"HashJoinExecutor".into(),
CHUNK_SIZE,
false,
ShutdownToken::empty(),
MemoryContext::none(),
))
Expand Down
54 changes: 6 additions & 48 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use std::hash::BuildHasher;
use std::marker::PhantomData;
use std::sync::Arc;

use anyhow::anyhow;
use bytes::Bytes;
use futures_async_stream::try_stream;
use futures_util::AsyncReadExt;
use hashbrown::hash_map::Entry;
use itertools::Itertools;
use prost::Message;
Expand All @@ -37,15 +35,16 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashAggNode;
use risingwave_pb::data::DataChunk as PbDataChunk;
use twox_hash::XxHash64;

use crate::error::{BatchError, Result};
use crate::executor::aggregation::build as build_agg;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
WrapStreamExecutor,
};
use crate::spill::spill_op::{SpillOp, DEFAULT_SPILL_PARTITION_NUM};
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

type AggHashMap<K, A> = hashbrown::HashMap<K, Vec<AggregateState>, PrecomputedBuildHasher, A>;
Expand Down Expand Up @@ -271,17 +270,6 @@ impl<K: HashKey + Send + Sync> Executor for HashAggExecutor<K> {
}
}

#[derive(Default, Clone, Copy)]
pub struct SpillBuildHasher(u64);

impl BuildHasher for SpillBuildHasher {
type Hasher = XxHash64;

fn build_hasher(&self) -> Self::Hasher {
XxHash64::with_seed(self.0)
}
}

/// `AggSpillManager` is used to manage how to write spill data file and read them back.
/// The spill data first need to be partitioned. Each partition contains 2 files: `agg_state_file` and `input_chunks_file`.
/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes.
Expand All @@ -300,12 +288,8 @@ pub struct AggSpillManager {
op: SpillOp,
partition_num: usize,
agg_state_writers: Vec<opendal::Writer>,
#[expect(dead_code)]
agg_state_readers: Vec<opendal::Reader>,
agg_state_chunk_builder: Vec<DataChunkBuilder>,
input_writers: Vec<opendal::Writer>,
#[expect(dead_code)]
input_readers: Vec<opendal::Reader>,
input_chunk_builders: Vec<DataChunkBuilder>,
spill_build_hasher: SpillBuildHasher,
group_key_types: Vec<DataType>,
Expand All @@ -327,21 +311,17 @@ impl AggSpillManager {
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let agg_state_writers = Vec::with_capacity(partition_num);
let agg_state_readers = Vec::with_capacity(partition_num);
let agg_state_chunk_builder = Vec::with_capacity(partition_num);
let input_writers = Vec::with_capacity(partition_num);
let input_readers = Vec::with_capacity(partition_num);
let input_chunk_builders = Vec::with_capacity(partition_num);
// Use uuid to generate an unique hasher so that when recursive spilling happens they would use a different hasher to avoid data skew.
let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
Ok(Self {
op,
partition_num,
agg_state_writers,
agg_state_readers,
agg_state_chunk_builder,
input_writers,
input_readers,
input_chunk_builders,
spill_build_hasher,
group_key_types,
Expand Down Expand Up @@ -437,36 +417,16 @@ impl AggSpillManager {
Ok(())
}

#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn read_stream(mut reader: opendal::Reader) {
let mut buf = [0u8; 4];
loop {
if let Err(err) = reader.read_exact(&mut buf).await {
if err.kind() == std::io::ErrorKind::UnexpectedEof {
break;
} else {
return Err(anyhow!(err).into());
}
}
let len = u32::from_le_bytes(buf) as usize;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?;
let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?;
let chunk = DataChunk::from_protobuf(&chunk_pb)?;
yield chunk;
}
}

async fn read_agg_state_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
let agg_state_partition_file_name = format!("agg-state-p{}", partition);
let r = self.op.reader_with(&agg_state_partition_file_name).await?;
Ok(Self::read_stream(r))
Ok(SpillOp::read_stream(r))
}

async fn read_input_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
let input_partition_file_name = format!("input-chunks-p{}", partition);
let r = self.op.reader_with(&input_partition_file_name).await?;
Ok(Self::read_stream(r))
Ok(SpillOp::read_stream(r))
}

async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
Expand Down Expand Up @@ -494,8 +454,6 @@ impl AggSpillManager {
}
}

const SPILL_AT_LEAST_MEMORY: u64 = 1024 * 1024;

impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
Expand Down Expand Up @@ -601,7 +559,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
// partition and spill to disk with the same hash function as the hash table spilling.
// Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original hash table and input data.
// A sub HashAggExecutor would be used to consume each partition one by one.
// If memory is still not enough in the sub HashAggExecutor, it will partition its hash table and input recursively.
// If memory is still not enough in the sub HashAggExecutor, it will spill its hash table and input recursively.
let mut agg_spill_manager = AggSpillManager::new(
&self.identity,
DEFAULT_SPILL_PARTITION_NUM,
Expand Down
Loading

0 comments on commit 030f2fa

Please sign in to comment.