Skip to content

Commit

Permalink
chain tail offset in hashmap
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Nov 10, 2023
1 parent 91c9d6f commit 9672505
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 29 deletions.
36 changes: 21 additions & 15 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,20 +725,29 @@ where
// insert hashes to key of the hashmap
let (mut_map, mut_list) = hash_map.get_mut();
for (row, hash_value) in hash_values.iter().enumerate() {
let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
if let Some((_, index)) = item {
let item = mut_map.get_mut(*hash_value, |(hash, _, _)| *hash_value == *hash);
if let Some((_, _, tail_index)) = item {
// let head = *index as usize;
// // Get tail for current chain
// let tail = chain_tails[head];
// // Set next value for current tail
// mut_list[tail - 1] = (row + offset + 1) as u64;
// // Sett current row as new tail
// chain_tails[head] = row + offset + 1;


// Already exists: add index to next array
let prev_index = *index;
let tail = *tail_index as usize;
// Store new value inside hashmap
*index = (row + offset + 1) as u64;
*tail_index = (row + offset) as u64;
// Update chained Vec at row + offset with previous value
mut_list[row + offset - deleted_offset] = prev_index;
mut_list[tail - deleted_offset] = (row + offset + 1) as u64;
} else {
mut_map.insert(
*hash_value,
// store the value + 1 as 0 value reserved for end of list
(*hash_value, (row + offset + 1) as u64),
|(hash, _)| *hash,
(*hash_value, (row + offset + 1) as u64, (row + offset) as u64),
|(hash, _, _)| *hash,
);
// chained list at (row + offset) is already initialized with 0
// meaning end of list
Expand Down Expand Up @@ -903,14 +912,14 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
// With this approach, the lexicographic order on both the probe side and the build side is preserved.
let hash_map = build_hashmap.get_map();
let next_chain = build_hashmap.get_list();
for (row, hash_value) in hash_values.iter().enumerate().rev() {
for (row, hash_value) in hash_values.iter().enumerate() {
// Get the hash and find it in the build index

// For every item on the build and probe we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some((_, index)) =
hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
if let Some((_, index, _)) =
hash_map.get(*hash_value, |(hash, _, _)| *hash_value == *hash)
{
let mut i = *index - 1;
loop {
Expand All @@ -936,9 +945,6 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
}
}
}
// Reversing both sets of indices
build_indices.as_slice_mut().reverse();
probe_indices.as_slice_mut().reverse();

let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
Expand Down Expand Up @@ -2508,8 +2514,8 @@ mod tests {
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;

// Create hash collisions (same hashes)
hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
hashmap_left.insert(hashes[0], (hashes[0], 1, 1), |(h, _, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], 1, 1), |(h, _, _)| *h);

let next = vec![2, 0];

Expand Down
28 changes: 14 additions & 14 deletions datafusion/physical-plan/src/joins/hash_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ use hashbrown::HashSet;
/// ```
pub struct JoinHashMap {
// Stores hash value to last row index
pub map: RawTable<(u64, u64)>,
pub map: RawTable<(u64, u64, u64)>,
// Stores indices in chained list data structure
pub next: Vec<u64>,
}
Expand All @@ -124,9 +124,9 @@ pub trait JoinHashMapType {
/// Extend with zero
fn extend_zero(&mut self, len: usize);
/// Returns mutable references to the hash map and the next.
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType);
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64, u64)>, &mut Self::NextType);
/// Returns a reference to the hash map.
fn get_map(&self) -> &RawTable<(u64, u64)>;
fn get_map(&self) -> &RawTable<(u64, u64, u64)>;
/// Returns a reference to the next.
fn get_list(&self) -> &Self::NextType;
}
Expand All @@ -139,12 +139,12 @@ impl JoinHashMapType for JoinHashMap {
fn extend_zero(&mut self, _: usize) {}

/// Get mutable references to the hash map and the next.
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64, u64)>, &mut Self::NextType) {
(&mut self.map, &mut self.next)
}

/// Get a reference to the hash map.
fn get_map(&self) -> &RawTable<(u64, u64)> {
fn get_map(&self) -> &RawTable<(u64, u64, u64)> {
&self.map
}

Expand All @@ -164,12 +164,12 @@ impl JoinHashMapType for PruningJoinHashMap {
}

/// Get mutable references to the hash map and the next.
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64, u64)>, &mut Self::NextType) {
(&mut self.map, &mut self.next)
}

/// Get a reference to the hash map.
fn get_map(&self) -> &RawTable<(u64, u64)> {
fn get_map(&self) -> &RawTable<(u64, u64, u64)> {
&self.map
}

Expand Down Expand Up @@ -222,7 +222,7 @@ impl fmt::Debug for JoinHashMap {
/// ```
pub struct PruningJoinHashMap {
/// Stores hash value to last row index
pub map: RawTable<(u64, u64)>,
pub map: RawTable<(u64, u64, u64)>,
/// Stores indices in chained list data structure
pub next: VecDeque<u64>,
}
Expand Down Expand Up @@ -262,7 +262,7 @@ impl PruningJoinHashMap {
if capacity > scale_factor * self.map.len() {
let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
// Resize the map with the new capacity.
self.map.shrink_to(new_capacity, |(hash, _)| *hash)
self.map.shrink_to(new_capacity, |(hash, _, _)| *hash)
}
}

Expand Down Expand Up @@ -298,7 +298,7 @@ impl PruningJoinHashMap {
self.map
.iter()
.map(|bucket| bucket.as_ref())
.filter_map(|(hash, tail_index)| {
.filter_map(|(hash, tail_index, _)| {
(*tail_index < prune_length as u64 + deleting_offset).then_some(*hash)
})
.collect::<Vec<_>>()
Expand All @@ -307,7 +307,7 @@ impl PruningJoinHashMap {
// Remove the keys from the map.
removable_keys.into_iter().for_each(|hash_value| {
self.map
.remove_entry(hash_value, |(hash, _)| hash_value == *hash);
.remove_entry(hash_value, |(hash, _, _)| hash_value == *hash);
});

// Shrink the map if necessary.
Expand Down Expand Up @@ -1091,8 +1091,8 @@ pub mod tests {
for hash_value in 0..data_size {
join_hash_map.map.insert(
hash_value,
(hash_value, hash_value),
|(hash, _)| *hash,
(hash_value, hash_value, hash_value),
|(hash, _, _)| *hash,
);
}

Expand All @@ -1103,7 +1103,7 @@ pub mod tests {
for hash_value in 0..deleted_part {
join_hash_map
.map
.remove_entry(hash_value, |(hash, _)| hash_value == *hash);
.remove_entry(hash_value, |(hash, _, _)| hash_value == *hash);
}

assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as usize);
Expand Down

0 comments on commit 9672505

Please sign in to comment.