Skip to content

Commit

Permalink
fix(batch): fix hash join process some remaining chunk builder (#15912)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored and kwannoel committed Mar 26, 2024
1 parent dba9f93 commit c23d925
Showing 1 changed file with 21 additions and 30 deletions.
51 changes: 21 additions & 30 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
}: EquiJoinParams<K>,
cond: &BoxedExpression,
) {
let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
let mut non_equi_state = LeftNonEquiJoinState {
probe_column_count: probe_data_types.len(),
..Default::default()
Expand All @@ -496,10 +497,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
continue;
}
non_equi_state.found_matched = false;
non_equi_state
.first_output_row_id
.push(chunk_builder.buffered_count());
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
non_equi_state
.first_output_row_id
.push(chunk_builder.buffered_count());

let mut build_row_id_iter = next_build_row_with_same_key
.row_id_iter(Some(*first_matched_build_row_id))
.peekable();
Expand Down Expand Up @@ -527,16 +529,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
shutdown_rx.check()?;
let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
if let Some(spilled) = Self::append_one_row_with_null_build_side(
&mut chunk_builder,
&mut remaining_chunk_builder,
probe_row,
build_data_types.len(),
) {
yield Self::process_left_outer_join_non_equi_condition(
spilled,
cond.as_ref(),
&mut non_equi_state,
)
.await?
yield spilled
}
}
}
Expand All @@ -550,6 +547,10 @@ impl<K: HashKey> HashJoinExecutor<K> {
)
.await?
}

if let Some(spilled) = remaining_chunk_builder.consume_all() {
yield spilled
}
}

#[try_stream(boxed, ok = DataChunk, error = BatchError)]
Expand Down Expand Up @@ -641,11 +642,12 @@ impl<K: HashKey> HashJoinExecutor<K> {
if !visible {
continue;
}
non_equi_state
.first_output_row_id
.push(chunk_builder.buffered_count());
non_equi_state.found_matched = false;
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
non_equi_state
.first_output_row_id
.push(chunk_builder.buffered_count());

for build_row_id in
next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
{
Expand Down Expand Up @@ -749,12 +751,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
&probe_chunk,
probe_row_id,
) {
yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
spilled,
cond.as_ref(),
&mut non_equi_state,
)
.await?
yield spilled
}
}
}
Expand Down Expand Up @@ -1187,13 +1184,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
probe_row,
build_data_types.len(),
) {
yield Self::process_full_outer_join_non_equi_condition(
spilled,
cond.as_ref(),
&mut left_non_equi_state,
&mut right_non_equi_state,
)
.await?
yield spilled
}
}
}
Expand Down Expand Up @@ -2423,13 +2414,13 @@ mod tests {

let expected_chunk = DataChunk::from_pretty(
"i f i F
1 6.1 . .
2 . . .
. 8.4 . .
3 3.9 . .
. . . .
4 6.6 4 7.5
3 . . .
1 6.1 . .
. 8.4 . .
. . . .
. 0.7 . .
5 . . .
. 5.5 . .",
Expand Down

0 comments on commit c23d925

Please sign in to comment.