diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index e20e39595ca2..76b21f67ef41 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -477,7 +477,8 @@ impl HashJoinExecutor { }: EquiJoinParams, cond: &BoxedExpression, ) { - let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size); + let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size); + let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size); let mut non_equi_state = LeftNonEquiJoinState { probe_column_count: probe_data_types.len(), ..Default::default() @@ -496,10 +497,11 @@ impl HashJoinExecutor { continue; } non_equi_state.found_matched = false; - non_equi_state - .first_output_row_id - .push(chunk_builder.buffered_count()); if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { + non_equi_state + .first_output_row_id + .push(chunk_builder.buffered_count()); + let mut build_row_id_iter = next_build_row_with_same_key .row_id_iter(Some(*first_matched_build_row_id)) .peekable(); @@ -527,16 +529,11 @@ impl HashJoinExecutor { shutdown_rx.check()?; let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id); if let Some(spilled) = Self::append_one_row_with_null_build_side( - &mut chunk_builder, + &mut remaining_chunk_builder, probe_row, build_data_types.len(), ) { - yield Self::process_left_outer_join_non_equi_condition( - spilled, - cond.as_ref(), - &mut non_equi_state, - ) - .await? + yield spilled } } } @@ -550,6 +547,10 @@ impl HashJoinExecutor { ) .await? } + + if let Some(spilled) = remaining_chunk_builder.consume_all() { + yield spilled + } } #[try_stream(boxed, ok = DataChunk, error = BatchError)] @@ -641,11 +642,12 @@ impl HashJoinExecutor { if !visible { continue; } - non_equi_state - .first_output_row_id - .push(chunk_builder.buffered_count()); non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { + non_equi_state + .first_output_row_id + .push(chunk_builder.buffered_count()); + for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) { @@ -749,12 +751,7 @@ impl HashJoinExecutor { &probe_chunk, probe_row_id, ) { - yield Self::process_left_semi_anti_join_non_equi_condition::( - spilled, - cond.as_ref(), - &mut non_equi_state, - ) - .await? + yield spilled } } } @@ -1187,13 +1184,7 @@ impl HashJoinExecutor { probe_row, build_data_types.len(), ) { - yield Self::process_full_outer_join_non_equi_condition( - spilled, - cond.as_ref(), - &mut left_non_equi_state, - &mut right_non_equi_state, - ) - .await? + yield spilled } } } @@ -2423,13 +2414,13 @@ mod tests { let expected_chunk = DataChunk::from_pretty( "i f i F - 1 6.1 . . 2 . . . - . 8.4 . . 3 3.9 . . - . . . . 4 6.6 4 7.5 3 . . . + 1 6.1 . . + . 8.4 . . + . . . . . 0.7 . . 5 . . . . 5.5 . .",