From 7945b84e86e30d9b61b99c02aae9497e07d31dfd Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 26 Mar 2024 15:48:28 +0800 Subject: [PATCH] fix(batch): fix hash join process some remaining chunk builder --- src/batch/src/executor/join/hash_join.rs | 31 ++++++++---------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 8ac20990b5c8c..5b46681e52169 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -477,7 +477,8 @@ impl HashJoinExecutor { }: EquiJoinParams, cond: &BoxedExpression, ) { - let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size); + let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size); + let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size); let mut non_equi_state = LeftNonEquiJoinState { probe_column_count: probe_data_types.len(), ..Default::default() @@ -527,16 +528,11 @@ impl HashJoinExecutor { shutdown_rx.check()?; let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id); if let Some(spilled) = Self::append_one_row_with_null_build_side( - &mut chunk_builder, + &mut remaining_chunk_builder, probe_row, build_data_types.len(), ) { - yield Self::process_left_outer_join_non_equi_condition( - spilled, - cond.as_ref(), - &mut non_equi_state, - ) - .await? + yield spilled } } } @@ -550,6 +546,10 @@ impl HashJoinExecutor { ) .await? } + + if let Some(spilled) = remaining_chunk_builder.consume_all() { + yield spilled + } } #[try_stream(boxed, ok = DataChunk, error = BatchError)] @@ -749,12 +749,7 @@ impl HashJoinExecutor { &probe_chunk, probe_row_id, ) { - yield Self::process_left_semi_anti_join_non_equi_condition::( - spilled, - cond.as_ref(), - &mut non_equi_state, - ) - .await? + yield spilled } } } @@ -1187,13 +1182,7 @@ impl HashJoinExecutor { probe_row, build_data_types.len(), ) { - yield Self::process_full_outer_join_non_equi_condition( - spilled, - cond.as_ref(), - &mut left_non_equi_state, - &mut right_non_equi_state, - ) - .await? + yield spilled } } }