Skip to content

Commit

Permalink
additional arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Dec 20, 2023
1 parent 6f5230f commit 62d258e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
37 changes: 28 additions & 9 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::joins::utils::{
need_produce_result_in_final, JoinHashMap, JoinHashMapType,
};
use crate::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
expressions::Column,
expressions::PhysicalSortExpr,
Expand All @@ -55,7 +54,7 @@ use arrow::array::{
UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder,
};
use arrow::compute::kernels::cmp::{eq, not_distinct};
use arrow::compute::{and, take, FilterBuilder};
use arrow::compute::{and, concat_batches, take, FilterBuilder};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
Expand Down Expand Up @@ -715,7 +714,7 @@ async fn collect_left_input(
let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
for batch in batches.iter().rev() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
Expand All @@ -726,12 +725,13 @@ async fn collect_left_input(
&random_state,
&mut hashes_buffer,
0,
true,
)?;
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we
// can directly index into the arrays
let single_batch = concat_batches(&schema, &batches, num_rows)?;
let single_batch = concat_batches(&schema, batches.iter().rev())?;
let data = JoinLeftData::new(hashmap, single_batch, reservation);

Ok(data)
Expand All @@ -747,6 +747,7 @@ pub fn update_hash<T>(
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
deleted_offset: usize,
use_reverse_order: bool,
) -> Result<()>
where
T: JoinHashMapType,
Expand All @@ -763,9 +764,15 @@ where
// For usual JoinHashmap, the implementation is void.
hash_map.extend_zero(batch.num_rows());

let hash_values_iter: Box<dyn Iterator<Item=(usize, &u64)>> = if use_reverse_order {
Box::new(hash_values.iter().enumerate().rev())
} else {
Box::new(hash_values.iter().enumerate())
};

// insert hashes to key of the hashmap
let (mut_map, mut_list) = hash_map.get_mut();
for (row, hash_value) in hash_values.iter().enumerate() {
for (row, hash_value) in hash_values_iter {
let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
if let Some((_, index)) = item {
// Already exists: add index to next array
Expand Down Expand Up @@ -987,6 +994,7 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
filter: Option<&JoinFilter>,
build_side: JoinSide,
deleted_offset: Option<usize>,
use_reverse_order: bool,
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
Expand Down Expand Up @@ -1034,9 +1042,15 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
// (5,1)
//
// With this approach, the lexicographic order on both the probe side and the build side is preserved.
let hash_values_iter: Box<dyn Iterator<Item=(usize, &u64)>> = if use_reverse_order {
Box::new(hash_values.iter().enumerate().rev())
} else {
Box::new(hash_values.iter().enumerate())
};

let hash_map = build_hashmap.get_map();
let next_chain = build_hashmap.get_list();
for (row, hash_value) in hash_values.iter().enumerate().rev() {
for (row, hash_value) in hash_values_iter {
// Get the hash and find it in the build index

// For every item on the build and probe we check if it matches
Expand Down Expand Up @@ -1069,9 +1083,12 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
}
}
}
// Reversing both sets of indices
build_indices.as_slice_mut().reverse();
probe_indices.as_slice_mut().reverse();

if use_reverse_order {
// Reversing both sets of indices
build_indices.as_slice_mut().reverse();
probe_indices.as_slice_mut().reverse();
}

let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
Expand Down Expand Up @@ -1279,6 +1296,7 @@ impl HashJoinStream {
self.filter.as_ref(),
JoinSide::Left,
None,
false,
);

let result = match left_right_indices {
Expand Down Expand Up @@ -2734,6 +2752,7 @@ mod tests {
None,
JoinSide::Left,
None,
false,
)?;

let mut left_ids = UInt64Builder::with_capacity(0);
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ pub(crate) fn join_with_probe_batch(
filter,
build_hash_joiner.build_side,
Some(build_hash_joiner.deleted_offset),
true,
)?;
if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
record_visited_indices(
Expand Down Expand Up @@ -882,6 +883,7 @@ impl OneSideHashJoiner {
random_state,
&mut self.hashes_buffer,
self.deleted_offset,
false,
)?;
Ok(())
}
Expand Down

0 comments on commit 62d258e

Please sign in to comment.