Skip to content

Commit

Permalink
GroupedHashAggregateStream breaks spill batch (apache#8004)
Browse files Browse the repository at this point in the history
... into smaller chunks to decrease memory required for merging.
  • Loading branch information
milenkovicm authored Nov 1, 2023
1 parent 7788b90 commit 94dac76
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2155,7 +2155,7 @@ mod tests {
spill: bool,
) -> Result<()> {
let task_ctx = if spill {
new_spill_ctx(2, 2812)
new_spill_ctx(2, 2886)
} else {
Arc::new(TaskContext::default())
};
Expand Down
11 changes: 10 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,16 @@ impl GroupedHashAggregateStream {
let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
// TODO: slice large `sorted` and write to multiple files in parallel
writer.write(&sorted)?;
let mut offset = 0;
let total_rows = sorted.num_rows();

while offset < total_rows {
let length = std::cmp::min(total_rows - offset, self.batch_size);
let batch = sorted.slice(offset, length);
offset += batch.num_rows();
writer.write(&batch)?;
}

writer.finish()?;
self.spill_state.spills.push(spillfile);
Ok(())
Expand Down

0 comments on commit 94dac76

Please sign in to comment.