From 62d258ea92e6eaa1e0c5081e62b51e9a665782a1 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Wed, 20 Dec 2023 22:04:04 +0300 Subject: [PATCH] additional arguments --- .../physical-plan/src/joins/hash_join.rs | 37 ++++++++++++++----- .../src/joins/symmetric_hash_join.rs | 2 + 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 13ac06ee301c0..e265e5cdc63d3 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -29,7 +29,6 @@ use crate::joins::utils::{ need_produce_result_in_final, JoinHashMap, JoinHashMapType, }; use crate::{ - coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, expressions::Column, expressions::PhysicalSortExpr, @@ -55,7 +54,7 @@ use arrow::array::{ UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, }; use arrow::compute::kernels::cmp::{eq, not_distinct}; -use arrow::compute::{and, take, FilterBuilder}; +use arrow::compute::{and, concat_batches, take, FilterBuilder}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -715,7 +714,7 @@ async fn collect_left_input( let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; - for batch in batches.iter() { + for batch in batches.iter().rev() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); update_hash( @@ -726,12 +725,13 @@ async fn collect_left_input( &random_state, &mut hashes_buffer, 0, + true, )?; offset += batch.num_rows(); } // Merge all batches into a single batch, so we // can directly index into the arrays - let single_batch = concat_batches(&schema, &batches, num_rows)?; + let single_batch = concat_batches(&schema, batches.iter().rev())?; let data = JoinLeftData::new(hashmap, single_batch, reservation); Ok(data) @@ -747,6 +747,7 @@ pub fn update_hash( random_state: &RandomState, hashes_buffer: &mut Vec, deleted_offset: usize, + use_reverse_order: bool, ) -> Result<()> where T: JoinHashMapType, @@ -763,9 +764,15 @@ where // For usual JoinHashmap, the implementation is void. hash_map.extend_zero(batch.num_rows()); + let hash_values_iter: Box> = if use_reverse_order { + Box::new(hash_values.iter().enumerate().rev()) + } else { + Box::new(hash_values.iter().enumerate()) + }; + // insert hashes to key of the hashmap let (mut_map, mut_list) = hash_map.get_mut(); - for (row, hash_value) in hash_values.iter().enumerate() { + for (row, hash_value) in hash_values_iter { let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, index)) = item { // Already exists: add index to next array @@ -987,6 +994,7 @@ pub fn build_equal_condition_join_indices( filter: Option<&JoinFilter>, build_side: JoinSide, deleted_offset: Option, + use_reverse_order: bool, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -1034,9 +1042,15 @@ pub fn build_equal_condition_join_indices( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. + let hash_values_iter: Box> = if use_reverse_order { + Box::new(hash_values.iter().enumerate().rev()) + } else { + Box::new(hash_values.iter().enumerate()) + }; + let hash_map = build_hashmap.get_map(); let next_chain = build_hashmap.get_list(); - for (row, hash_value) in hash_values.iter().enumerate().rev() { + for (row, hash_value) in hash_values_iter { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches @@ -1069,9 +1083,12 @@ pub fn build_equal_condition_join_indices( } } } - // Reversing both sets of indices - build_indices.as_slice_mut().reverse(); - probe_indices.as_slice_mut().reverse(); + + if use_reverse_order { + // Reversing both sets of indices + build_indices.as_slice_mut().reverse(); + probe_indices.as_slice_mut().reverse(); + } let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); @@ -1279,6 +1296,7 @@ impl HashJoinStream { self.filter.as_ref(), JoinSide::Left, None, + false, ); let result = match left_right_indices { @@ -2734,6 +2752,7 @@ mod tests { None, JoinSide::Left, None, + false, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b9101b57c3e55..5979b6a4047ad 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -770,6 +770,7 @@ pub(crate) fn join_with_probe_batch( filter, build_hash_joiner.build_side, Some(build_hash_joiner.deleted_offset), + true, )?; if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( @@ -882,6 +883,7 @@ impl OneSideHashJoiner { random_state, &mut self.hashes_buffer, self.deleted_offset, + false, )?; Ok(()) }