From 7945b84e86e30d9b61b99c02aae9497e07d31dfd Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 26 Mar 2024 15:48:28 +0800 Subject: [PATCH 1/2] fix(batch): fix hash join process some remaining chunk builder --- src/batch/src/executor/join/hash_join.rs | 31 ++++++++---------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 8ac20990b5c8c..5b46681e52169 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -477,7 +477,8 @@ impl HashJoinExecutor { }: EquiJoinParams, cond: &BoxedExpression, ) { - let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size); + let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size); + let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size); let mut non_equi_state = LeftNonEquiJoinState { probe_column_count: probe_data_types.len(), ..Default::default() @@ -527,16 +528,11 @@ impl HashJoinExecutor { shutdown_rx.check()?; let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id); if let Some(spilled) = Self::append_one_row_with_null_build_side( - &mut chunk_builder, + &mut remaining_chunk_builder, probe_row, build_data_types.len(), ) { - yield Self::process_left_outer_join_non_equi_condition( - spilled, - cond.as_ref(), - &mut non_equi_state, - ) - .await? + yield spilled } } } @@ -550,6 +546,10 @@ impl HashJoinExecutor { ) .await? } + + if let Some(spilled) = remaining_chunk_builder.consume_all() { + yield spilled + } } #[try_stream(boxed, ok = DataChunk, error = BatchError)] @@ -749,12 +749,7 @@ impl HashJoinExecutor { &probe_chunk, probe_row_id, ) { - yield Self::process_left_semi_anti_join_non_equi_condition::( - spilled, - cond.as_ref(), - &mut non_equi_state, - ) - .await? + yield spilled } } } @@ -1187,13 +1182,7 @@ impl HashJoinExecutor { probe_row, build_data_types.len(), ) { - yield Self::process_full_outer_join_non_equi_condition( - spilled, - cond.as_ref(), - &mut left_non_equi_state, - &mut right_non_equi_state, - ) - .await? + yield spilled } } } From 1f11695e6086ca91c9dd7abd1c9dd15fd42b2ecd Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 26 Mar 2024 16:24:24 +0800 Subject: [PATCH 2/2] fix ut --- src/batch/src/executor/join/hash_join.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 5b46681e52169..c9bc0caad7cef 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -497,10 +497,11 @@ impl HashJoinExecutor { continue; } non_equi_state.found_matched = false; - non_equi_state - .first_output_row_id - .push(chunk_builder.buffered_count()); if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { + non_equi_state + .first_output_row_id + .push(chunk_builder.buffered_count()); + let mut build_row_id_iter = next_build_row_with_same_key .row_id_iter(Some(*first_matched_build_row_id)) .peekable(); @@ -641,11 +642,12 @@ impl HashJoinExecutor { if !visible { continue; } - non_equi_state - .first_output_row_id - .push(chunk_builder.buffered_count()); non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { + non_equi_state + .first_output_row_id + .push(chunk_builder.buffered_count()); + for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) { @@ -2412,13 +2414,13 @@ mod tests { let expected_chunk = DataChunk::from_pretty( "i f i F - 1 6.1 . . 2 . . . - . 8.4 . . 3 3.9 . . - . . . . 4 6.6 4 7.5 3 . . . + 1 6.1 . . + . 8.4 . . + . . . . . 0.7 . . 5 . . . . 5.5 . .",