diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 9aa776fe054c..bdff46c49853 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -78,6 +78,9 @@ use futures::{ready, Stream, StreamExt, TryStreamExt}; type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation); +/// Tuple representing last matched probe-build side indices for partial join output +type MatchedIndicesPair = Option<(usize, usize)>; + /// Join execution plan executes partitions in parallel and combines them into a set of /// partitions. /// @@ -465,6 +468,8 @@ impl ExecutionPlan for HashJoinExec { } }; + let batch_size = context.session_config().batch_size(); + let reservation = MemoryConsumer::new(format!("HashJoinStream[{partition}]")) .register(context.memory_pool()); @@ -487,6 +492,9 @@ impl ExecutionPlan for HashJoinExec { null_equals_null: self.null_equals_null, is_exhausted: false, reservation, + batch_size, + last_matched_indices: None, + probe_batch: None, })) } @@ -682,6 +690,15 @@ struct HashJoinStream { null_equals_null: bool, /// Memory reservation reservation: MemoryReservation, + /// Batch size + batch_size: usize, + /// Current probe batch + probe_batch: Option, + /// In case joining current probe batch with build side may produce more than `batch_size` records + /// (cross-join due to key duplication on build side) `HashJoinStream` saves last matched indices + /// and emits result batch to upstream operator. + /// On next poll these indices are used to skip already matched rows. + last_matched_indices: MatchedIndicesPair, } impl RecordBatchStream for HashJoinStream { @@ -734,7 +751,9 @@ pub fn build_equal_condition_join_indices( filter: Option<&JoinFilter>, build_side: JoinSide, deleted_offset: Option, -) -> Result<(UInt64Array, UInt32Array)> { + output_limit: usize, + start_indices: MatchedIndicesPair, +) -> Result<(UInt64Array, UInt32Array, MatchedIndicesPair)> { let keys_values = probe_on .iter() .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) @@ -783,16 +802,37 @@ pub fn build_equal_condition_join_indices( // With this approach, the lexicographic order on both the probe side and the build side is preserved. let hash_map = build_hashmap.get_map(); let next_chain = build_hashmap.get_list(); - for (row, hash_value) in hash_values.iter().enumerate().rev() { - // Get the hash and find it in the build index + + let mut output_tuples = 0_usize; + let mut last_matched_indices = None; + + let (initial_probe, initial_build) = + start_indices.map_or_else(|| (0, 0), |pair| pair); + + 'probe: for (row, hash_value) in hash_values.iter().enumerate().skip(initial_probe) { + let index = if start_indices.is_some() && row == initial_probe { + // in case of partially skipped input -- calculating next build index + // using last matched pair + let next = next_chain[initial_build]; + if next == 0 { + continue; + } + Some(next) + } else if let Some((_, index)) = + hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) + { + // otherwise -- checking build hashmap for precense of current hash_value + Some(*index) + } else { + None + }; // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, index)) = - hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) - { - let mut i = *index - 1; + if let Some(index) = index { + let mut i = index - 1; + loop { let build_row_value = if let Some(offset) = deleted_offset { // This arguments means that we prune the next index way before here. @@ -806,8 +846,17 @@ pub fn build_equal_condition_join_indices( }; build_indices.append(build_row_value); probe_indices.append(row as u32); + + output_tuples += 1; + + if output_tuples >= output_limit { + last_matched_indices = Some((row, i as usize)); + break 'probe; + } + // Follow the chain to get the next index value let next = next_chain[build_row_value as usize]; + if next == 0 { // end of list break; @@ -816,9 +865,13 @@ pub fn build_equal_condition_join_indices( } } } - // Reversing both sets of indices - build_indices.as_slice_mut().reverse(); - probe_indices.as_slice_mut().reverse(); + + // if both probe and build sides have been scanned -- return None + if last_matched_indices + .is_some_and(|(probe, build)| probe == probe_batch.num_rows() - 1 && build == 0) + { + last_matched_indices = None + }; let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); @@ -837,13 +890,15 @@ pub fn build_equal_condition_join_indices( (left, right) }; - equal_rows_arr( + let matched_indices = equal_rows_arr( &left, &right, &build_join_values, &keys_values, null_equals_null, - ) + )?; + + Ok((matched_indices.0, matched_indices.1, last_matched_indices)) } // version of eq_dyn supporting equality on null arrays @@ -942,107 +997,140 @@ impl HashJoinStream { } }); let mut hashes_buffer = vec![]; - self.right - .poll_next_unpin(cx) - .map(|maybe_batch| match maybe_batch { - // one right batch in the join loop - Some(Ok(batch)) => { - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(batch.num_rows()); - let timer = self.join_metrics.join_time.timer(); - - // get the matched two indices for the on condition - let left_right_indices = build_equal_condition_join_indices( - &left_data.0, - &left_data.1, - &batch, - &self.on_left, - &self.on_right, - &self.random_state, - self.null_equals_null, - &mut hashes_buffer, - self.filter.as_ref(), - JoinSide::Left, - None, - ); - let result = match left_right_indices { - Ok((left_side, right_side)) => { - // set the left bitmap - // and only left, full, left semi, left anti need the left bitmap - if need_produce_result_in_final(self.join_type) { - left_side.iter().flatten().for_each(|x| { - visited_left_side.set_bit(x as usize, true); - }); - } - - // adjust the two side indices base on the join type - let (left_side, right_side) = adjust_indices_by_join_type( - left_side, - right_side, - batch.num_rows(), - self.join_type, - ); - - let result = build_batch_from_indices( - &self.schema, - &left_data.1, - &batch, - &left_side, - &right_side, - &self.column_indices, - JoinSide::Left, - ); - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); - Some(result) - } - Err(err) => Some(exec_err!( - "Fail to build join indices in HashJoinExec, error:{err}" - )), - }; - timer.done(); - result + // Fetch next probe batch + if self.probe_batch.is_none() { + match self.right.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + self.probe_batch = Some(batch); } - None => { - let timer = self.join_metrics.join_time.timer(); - if need_produce_result_in_final(self.join_type) && !self.is_exhausted - { - // use the global left bitmap to produce the left indices and right indices - let (left_side, right_side) = get_final_indices_from_bit_map( - visited_left_side, + Poll::Ready(None) => { + self.probe_batch = None; + } + Poll::Ready(Some(err)) => return Poll::Ready(Some(err)), + Poll::Pending => return Poll::Pending, + } + } + + let output_batch = match &self.probe_batch { + // one right batch in the join loop + Some(batch) => { + self.join_metrics.input_batches.add(1); + self.join_metrics.input_rows.add(batch.num_rows()); + let timer = self.join_metrics.join_time.timer(); + + // get the matched two indices for the on condition + let left_right_indices = build_equal_condition_join_indices( + &left_data.0, + &left_data.1, + batch, + &self.on_left, + &self.on_right, + &self.random_state, + self.null_equals_null, + &mut hashes_buffer, + self.filter.as_ref(), + JoinSide::Left, + None, + self.batch_size, + self.last_matched_indices, + ); + + let result = match left_right_indices { + Ok((left_side, right_side, last_matched_indices)) => { + // set the left bitmap + // and only left, full, left semi, left anti need the left bitmap + if need_produce_result_in_final(self.join_type) { + left_side.iter().flatten().for_each(|x| { + visited_left_side.set_bit(x as usize, true); + }); + } + + // adjust the two side indices base on the join type + // no need to adjust `self.last_matched_indices.0` (if some) + // as it has been joined while previous iteration + let adjust_range = + match (self.last_matched_indices, last_matched_indices) { + (None, None) => 0..batch.num_rows(), + (None, Some((range_end, _))) => 0..range_end + 1, + (Some((range_start, _)), None) => { + range_start + 1..batch.num_rows() + } + (Some((range_start, _)), Some((range_end, _))) => { + range_start + 1..range_end + 1 + } + }; + + let (left_side, right_side) = adjust_indices_by_join_type( + left_side, + right_side, + adjust_range, self.join_type, ); - let empty_right_batch = - RecordBatch::new_empty(self.right.schema()); - // use the left and right indices to produce the batch result + let result = build_batch_from_indices( &self.schema, &left_data.1, - &empty_right_batch, + batch, &left_side, &right_side, &self.column_indices, JoinSide::Left, ); + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); - if let Ok(ref batch) = result { - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(batch.num_rows()); + if last_matched_indices.is_none() { + self.probe_batch = None; + }; + self.last_matched_indices = last_matched_indices; - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); - } - timer.done(); - self.is_exhausted = true; Some(result) - } else { - // end of the join loop - None } + Err(err) => Some(exec_err!( + "Fail to build join indices in HashJoinExec, error:{err}" + )), + }; + + timer.done(); + result + } + None => { + let timer = self.join_metrics.join_time.timer(); + if need_produce_result_in_final(self.join_type) && !self.is_exhausted { + // use the global left bitmap to produce the left indices and right indices + let (left_side, right_side) = + get_final_indices_from_bit_map(visited_left_side, self.join_type); + let empty_right_batch = RecordBatch::new_empty(self.right.schema()); + // use the left and right indices to produce the batch result + let result = build_batch_from_indices( + &self.schema, + &left_data.1, + &empty_right_batch, + &left_side, + &right_side, + &self.column_indices, + JoinSide::Left, + ); + + if let Ok(ref batch) = result { + self.join_metrics.input_batches.add(1); + self.join_metrics.input_rows.add(batch.num_rows()); + + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); + } + timer.done(); + self.is_exhausted = true; + Some(result) + } else { + // end of the join loop + None } - Some(err) => Some(err), - }) + } + }; + + Poll::Ready(output_batch) } } @@ -2406,7 +2494,7 @@ mod tests { }, left, ); - let (l, r) = build_equal_condition_join_indices( + let (l, r, _) = build_equal_condition_join_indices( &left_data.0, &left_data.1, &right, @@ -2418,6 +2506,8 @@ mod tests { None, JoinSide::Left, None, + 64, + None, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index a113066e39d1..73fd5c1caec7 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -670,20 +670,20 @@ fn adjust_indices_by_join_type( // matched // unmatched right row will be produced in this batch let right_unmatched_indices = - get_anti_indices(count_right_batch, &right_indices); + get_anti_indices(0..count_right_batch, &right_indices); // combine the matched and unmatched right result together append_right_indices(left_indices, right_indices, right_unmatched_indices) } JoinType::RightSemi => { // need to remove the duplicated record in the right side - let right_indices = get_semi_indices(count_right_batch, &right_indices); + let right_indices = get_semi_indices(0..count_right_batch, &right_indices); // the left_indices will not be used later for the `right semi` join (left_indices, right_indices) } JoinType::RightAnti => { // need to remove the duplicated record in the right side // get the anti index for the right side - let right_indices = get_anti_indices(count_right_batch, &right_indices); + let right_indices = get_anti_indices(0..count_right_batch, &right_indices); // the left_indices will not be used later for the `right anti` join (left_indices, right_indices) } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 00d43aead434..5f0db8117846 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -818,7 +818,7 @@ pub(crate) fn join_with_probe_batch( if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 { return Ok(None); } - let (build_indices, probe_indices) = build_equal_condition_join_indices( + let (build_indices, probe_indices, _) = build_equal_condition_join_indices( &build_hash_joiner.hashmap, &build_hash_joiner.input_buffer, probe_batch, @@ -830,6 +830,8 @@ pub(crate) fn join_with_probe_batch( filter, build_hash_joiner.build_side, Some(build_hash_joiner.deleted_offset), + usize::MAX, + None, )?; if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cf150ddf575f..01d43c2b51b1 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::future::Future; +use std::ops::Range; use std::sync::Arc; use std::task::{Context, Poll}; use std::usize; @@ -847,7 +848,7 @@ pub(crate) fn build_batch_from_indices( pub(crate) fn adjust_indices_by_join_type( left_indices: UInt64Array, right_indices: UInt32Array, - count_right_batch: usize, + adjust_range: Range, join_type: JoinType, ) -> (UInt64Array, UInt32Array) { match join_type { @@ -863,21 +864,20 @@ pub(crate) fn adjust_indices_by_join_type( JoinType::Right | JoinType::Full => { // matched // unmatched right row will be produced in this batch - let right_unmatched_indices = - get_anti_indices(count_right_batch, &right_indices); + let right_unmatched_indices = get_anti_indices(adjust_range, &right_indices); // combine the matched and unmatched right result together append_right_indices(left_indices, right_indices, right_unmatched_indices) } JoinType::RightSemi => { // need to remove the duplicated record in the right side - let right_indices = get_semi_indices(count_right_batch, &right_indices); + let right_indices = get_semi_indices(adjust_range, &right_indices); // the left_indices will not be used later for the `right semi` join (left_indices, right_indices) } JoinType::RightAnti => { // need to remove the duplicated record in the right side // get the anti index for the right side - let right_indices = get_anti_indices(count_right_batch, &right_indices); + let right_indices = get_anti_indices(adjust_range, &right_indices); // the left_indices will not be used later for the `right anti` join (left_indices, right_indices) } @@ -919,20 +919,26 @@ pub(crate) fn append_right_indices( } } -/// Get unmatched and deduplicated indices +/// Get unmatched and deduplicated indices for specified range of indices pub(crate) fn get_anti_indices( - row_count: usize, + rg: Range, input_indices: &UInt32Array, ) -> UInt32Array { - let mut bitmap = BooleanBufferBuilder::new(row_count); - bitmap.append_n(row_count, false); - input_indices.iter().flatten().for_each(|v| { - bitmap.set_bit(v as usize, true); - }); + let mut bitmap = BooleanBufferBuilder::new(rg.len()); + bitmap.append_n(rg.len(), false); + input_indices + .iter() + .flatten() + .map(|v| v as usize) + .filter(|v| rg.contains(v)) + .for_each(|v| { + bitmap.set_bit(v - rg.start, true); + }); + + let offset = rg.start; // get the anti index - (0..row_count) - .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32)) + (rg).filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as u32)) .collect::() } @@ -953,20 +959,26 @@ pub(crate) fn get_anti_u64_indices( .collect::() } -/// Get matched and deduplicated indices +/// Get matched and deduplicated indices for specified range of indices pub(crate) fn get_semi_indices( - row_count: usize, + rg: Range, input_indices: &UInt32Array, ) -> UInt32Array { - let mut bitmap = BooleanBufferBuilder::new(row_count); - bitmap.append_n(row_count, false); - input_indices.iter().flatten().for_each(|v| { - bitmap.set_bit(v as usize, true); - }); + let mut bitmap = BooleanBufferBuilder::new(rg.len()); + bitmap.append_n(rg.len(), false); + input_indices + .iter() + .flatten() + .map(|v| v as usize) + .filter(|v| rg.contains(v)) + .for_each(|v| { + bitmap.set_bit(v - rg.start, true); + }); + + let offset = rg.start; // get the semi index - (0..row_count) - .filter_map(|idx| (bitmap.get_bit(idx)).then_some(idx as u32)) + (rg).filter_map(|idx| (bitmap.get_bit(idx - offset)).then_some(idx as u32)) .collect::() } diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index 1312f2916ed6..e5d4c25f48c8 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2 ON t1.d = t2.d ORDER BY a2, t2.b LIMIT 5 ---- -0 0 0 0 -0 0 2 0 -0 0 3 0 -0 0 6 0 -0 0 20 0 +1 3 95 0 +1 3 93 0 +1 3 92 0 +1 3 81 0 +1 3 76 0 query TT EXPLAIN SELECT t2.a as a2, t2.b