diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 1a2db87d98a2..69194fd2ad38 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -725,20 +725,29 @@ where // insert hashes to key of the hashmap let (mut_map, mut_list) = hash_map.get_mut(); for (row, hash_value) in hash_values.iter().enumerate() { - let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, index)) = item { + let item = mut_map.get_mut(*hash_value, |(hash, _, _)| *hash_value == *hash); + if let Some((_, _, tail_index)) = item { + // let head = *index as usize; + // // Get tail for current chain + // let tail = chain_tails[head]; + // // Set next value for current tail + // mut_list[tail - 1] = (row + offset + 1) as u64; + // // Sett current row as new tail + // chain_tails[head] = row + offset + 1; + + // Already exists: add index to next array - let prev_index = *index; + let tail = *tail_index as usize; // Store new value inside hashmap - *index = (row + offset + 1) as u64; + *tail_index = (row + offset) as u64; // Update chained Vec at row + offset with previous value - mut_list[row + offset - deleted_offset] = prev_index; + mut_list[tail - deleted_offset] = (row + offset + 1) as u64; } else { mut_map.insert( *hash_value, // store the value + 1 as 0 value reserved for end of list - (*hash_value, (row + offset + 1) as u64), - |(hash, _)| *hash, + (*hash_value, (row + offset + 1) as u64, (row + offset) as u64), + |(hash, _, _)| *hash, ); // chained list at (row + offset) is already initialized with 0 // meaning end of list @@ -903,14 +912,14 @@ pub fn build_equal_condition_join_indices( // With this approach, the lexicographic order on both the probe side and the build side is preserved. let hash_map = build_hashmap.get_map(); let next_chain = build_hashmap.get_list(); - for (row, hash_value) in hash_values.iter().enumerate().rev() { + for (row, hash_value) in hash_values.iter().enumerate() { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, index)) = - hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) + if let Some((_, index, _)) = + hash_map.get(*hash_value, |(hash, _, _)| *hash_value == *hash) { let mut i = *index - 1; loop { @@ -936,9 +945,6 @@ pub fn build_equal_condition_join_indices( } } } - // Reversing both sets of indices - build_indices.as_slice_mut().reverse(); - probe_indices.as_slice_mut().reverse(); let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); @@ -2508,8 +2514,8 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions (same hashes) - hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h); + hashmap_left.insert(hashes[0], (hashes[0], 1, 1), |(h, _, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], 1, 1), |(h, _, _)| *h); let next = vec![2, 0]; diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs index c134b23d78cf..236fac1f016a 100644 --- a/datafusion/physical-plan/src/joins/hash_join_utils.rs +++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs @@ -103,7 +103,7 @@ use hashbrown::HashSet; /// ``` pub struct JoinHashMap { // Stores hash value to last row index - pub map: RawTable<(u64, u64)>, + pub map: RawTable<(u64, u64, u64)>, // Stores indices in chained list data structure pub next: Vec, } @@ -124,9 +124,9 @@ pub trait JoinHashMapType { /// Extend with zero fn extend_zero(&mut self, len: usize); /// Returns mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType); + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64, u64)>, &mut Self::NextType); /// Returns a reference to the hash map. - fn get_map(&self) -> &RawTable<(u64, u64)>; + fn get_map(&self) -> &RawTable<(u64, u64, u64)>; /// Returns a reference to the next. fn get_list(&self) -> &Self::NextType; } @@ -139,12 +139,12 @@ impl JoinHashMapType for JoinHashMap { fn extend_zero(&mut self, _: usize) {} /// Get mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64, u64)>, &mut Self::NextType) { (&mut self.map, &mut self.next) } /// Get a reference to the hash map. - fn get_map(&self) -> &RawTable<(u64, u64)> { + fn get_map(&self) -> &RawTable<(u64, u64, u64)> { &self.map } @@ -164,12 +164,12 @@ impl JoinHashMapType for PruningJoinHashMap { } /// Get mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64, u64)>, &mut Self::NextType) { (&mut self.map, &mut self.next) } /// Get a reference to the hash map. - fn get_map(&self) -> &RawTable<(u64, u64)> { + fn get_map(&self) -> &RawTable<(u64, u64, u64)> { &self.map } @@ -222,7 +222,7 @@ impl fmt::Debug for JoinHashMap { /// ``` pub struct PruningJoinHashMap { /// Stores hash value to last row index - pub map: RawTable<(u64, u64)>, + pub map: RawTable<(u64, u64, u64)>, /// Stores indices in chained list data structure pub next: VecDeque, } @@ -262,7 +262,7 @@ impl PruningJoinHashMap { if capacity > scale_factor * self.map.len() { let new_capacity = (capacity * (scale_factor - 1)) / scale_factor; // Resize the map with the new capacity. - self.map.shrink_to(new_capacity, |(hash, _)| *hash) + self.map.shrink_to(new_capacity, |(hash, _, _)| *hash) } } @@ -298,7 +298,7 @@ impl PruningJoinHashMap { self.map .iter() .map(|bucket| bucket.as_ref()) - .filter_map(|(hash, tail_index)| { + .filter_map(|(hash, tail_index, _)| { (*tail_index < prune_length as u64 + deleting_offset).then_some(*hash) }) .collect::>() @@ -307,7 +307,7 @@ impl PruningJoinHashMap { // Remove the keys from the map. removable_keys.into_iter().for_each(|hash_value| { self.map - .remove_entry(hash_value, |(hash, _)| hash_value == *hash); + .remove_entry(hash_value, |(hash, _, _)| hash_value == *hash); }); // Shrink the map if necessary. @@ -1091,8 +1091,8 @@ pub mod tests { for hash_value in 0..data_size { join_hash_map.map.insert( hash_value, - (hash_value, hash_value), - |(hash, _)| *hash, + (hash_value, hash_value, hash_value), + |(hash, _, _)| *hash, ); } @@ -1103,7 +1103,7 @@ pub mod tests { for hash_value in 0..deleted_part { join_hash_map .map - .remove_entry(hash_value, |(hash, _)| hash_value == *hash); + .remove_entry(hash_value, |(hash, _, _)| hash_value == *hash); } assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as usize);