Skip to content

Commit

Permalink
fix(batch): fix hash join process some remaining chunk builder
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Mar 26, 2024
1 parent 7ce1b6a commit 7945b84
Showing 1 changed file with 10 additions and 21 deletions.
31 changes: 10 additions & 21 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
}: EquiJoinParams<K>,
cond: &BoxedExpression,
) {
let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
let mut non_equi_state = LeftNonEquiJoinState {
probe_column_count: probe_data_types.len(),
..Default::default()
Expand Down Expand Up @@ -527,16 +528,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
shutdown_rx.check()?;
let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
if let Some(spilled) = Self::append_one_row_with_null_build_side(
&mut chunk_builder,
&mut remaining_chunk_builder,
probe_row,
build_data_types.len(),
) {
yield Self::process_left_outer_join_non_equi_condition(
spilled,
cond.as_ref(),
&mut non_equi_state,
)
.await?
yield spilled
}
}
}
Expand All @@ -550,6 +546,10 @@ impl<K: HashKey> HashJoinExecutor<K> {
)
.await?
}

if let Some(spilled) = remaining_chunk_builder.consume_all() {
yield spilled
}
}

#[try_stream(boxed, ok = DataChunk, error = BatchError)]
Expand Down Expand Up @@ -749,12 +749,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
&probe_chunk,
probe_row_id,
) {
yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
spilled,
cond.as_ref(),
&mut non_equi_state,
)
.await?
yield spilled
}
}
}
Expand Down Expand Up @@ -1187,13 +1182,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
probe_row,
build_data_types.len(),
) {
yield Self::process_full_outer_join_non_equi_condition(
spilled,
cond.as_ref(),
&mut left_non_equi_state,
&mut right_non_equi_state,
)
.await?
yield spilled
}
}
}
Expand Down

0 comments on commit 7945b84

Please sign in to comment.