Skip to content

Commit

Permalink
support full join for nested loop join
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Sep 8, 2024
1 parent a4cdca4 commit 6684d17
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 63 deletions.
97 changes: 88 additions & 9 deletions src/execution/dql/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Defines the nested loop join executor, it supports [`JoinType::Inner`], [`JoinType::LeftOuter`],
//! [`JoinType::LeftSemi`], [`JoinType::LeftAnti`], [`JoinType::RightOuter`], [`JoinType::Cross`].
//! But [`JoinType::Full`] is not supported.
//! [`JoinType::LeftSemi`], [`JoinType::LeftAnti`], [`JoinType::RightOuter`], [`JoinType::Cross`], [`JoinType::Full`].
use super::joins_nullable;
use crate::catalog::{ColumnCatalog, ColumnRef};
Expand All @@ -14,6 +13,7 @@ use crate::storage::{StatisticsMetaCache, TableCache, Transaction};
use crate::throw;
use crate::types::tuple::{Schema, SchemaRef, Tuple};
use crate::types::value::{DataValue, NULL_VALUE};
use crate::utils::bit_vector::BitVector;
use itertools::Itertools;
use std::ops::Coroutine;
use std::ops::CoroutineState;
Expand Down Expand Up @@ -73,7 +73,7 @@ impl EqualCondition {
/// |--------------------------------|----------------|----------------|
/// | Right/RightSemi/RightAnti/Full | left | right |
/// |--------------------------------|----------------|----------------|
/// | Full | not supported | not supported |
/// | Full | left | right |
pub struct NestedLoopJoin {
left_input: LogicalPlan,
right_input: LogicalPlan,
Expand Down Expand Up @@ -144,11 +144,10 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
..
} = self;

if matches!(self.ty, JoinType::Full) {
unreachable!("{} cannot be handled in nested loop join", self.ty)
}
let right_schema_len = eq_cond.right_schema.len();
let mut left_coroutine = build_read(left_input, cache, transaction);
let mut bitmap: Option<BitVector> = None;
let mut first_matches = Vec::new();

while let CoroutineState::Yielded(left_tuple) =
Pin::new(&mut left_coroutine).resume(())
Expand All @@ -157,6 +156,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
let mut has_matched = false;

let mut right_coroutine = build_read(right_input.clone(), cache, transaction);
let mut right_idx = 0;

while let CoroutineState::Yielded(right_tuple) =
Pin::new(&mut right_coroutine).resume(())
Expand Down Expand Up @@ -210,16 +210,29 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
if matches!(ty, JoinType::LeftSemi) {
break;
}
if let Some(bits) = bitmap.as_mut() {
bits.set_bit(right_idx, true);
} else if matches!(ty, JoinType::Full) {
first_matches.push(right_idx);
}
}
if matches!(ty, JoinType::LeftAnti) && has_matched {
break;
}
right_idx += 1;
}

if matches!(self.ty, JoinType::Full) && bitmap.is_none() {
bitmap = Some(BitVector::new(right_idx));
}

// handle no matched tuple case
let tuple = match ty {
JoinType::LeftAnti if !has_matched => Some(left_tuple.clone()),
JoinType::LeftOuter | JoinType::LeftSemi | JoinType::RightOuter
JoinType::LeftOuter
| JoinType::LeftSemi
| JoinType::RightOuter
| JoinType::Full
if !has_matched =>
{
let right_tuple = Tuple {
Expand All @@ -238,6 +251,27 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
yield Ok(tuple)
}
}

if matches!(ty, JoinType::Full) {
for idx in first_matches.into_iter() {
bitmap.as_mut().unwrap().set_bit(idx, true);
}

let mut right_coroutine = build_read(right_input.clone(), cache, transaction);
let mut idx = 0;
while let CoroutineState::Yielded(right_tuple) =
Pin::new(&mut right_coroutine).resume(())
{
if !bitmap.as_ref().unwrap().get_bit(idx) {
let mut right_tuple: Tuple = throw!(right_tuple);
let mut values = vec![NULL_VALUE.clone(); right_schema_len];
values.append(&mut right_tuple.values);

yield Ok(Tuple { id: None, values })
}
idx += 1;
}
}
},
)
}
Expand Down Expand Up @@ -265,7 +299,7 @@ impl NestedLoopJoin {
.collect_vec();
match ty {
JoinType::Inner | JoinType::Cross | JoinType::LeftSemi if !is_matched => values.clear(),
JoinType::LeftOuter if !is_matched => {
JoinType::LeftOuter | JoinType::Full if !is_matched => {
values
.iter_mut()
.skip(left_len)
Expand All @@ -284,7 +318,6 @@ impl NestedLoopJoin {
values.truncate(left_len);
}
}
JoinType::Full => todo!("Not support now."),
_ => (),
};

Expand Down Expand Up @@ -740,4 +773,50 @@ mod test {

Ok(())
}

#[test]
fn test_nested_full_join() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
on: keys,
filter: Some(filter),
},
join_type: JoinType::Full,
};
let executor = NestedLoopJoin::from((op, left, right))
.execute((&table_cache, &meta_cache), &transaction);
let tuples = try_collect(executor)?;

debug_assert_eq!(
tuples[0].values,
build_integers(vec![Some(0), Some(2), Some(4), None, None, None])
);

let mut expected_set = HashSet::with_capacity(7);
let tuple = build_integers(vec![Some(0), Some(2), Some(4), None, None, None]);
expected_set.insert(tuple);
let tuple = build_integers(vec![Some(1), Some(2), Some(5), Some(0), Some(2), Some(4)]);
expected_set.insert(tuple);

let tuple = build_integers(vec![Some(1), Some(3), Some(5), None, None, None]);
expected_set.insert(tuple);
let tuple = build_integers(vec![Some(3), Some(5), Some(7), None, None, None]);
expected_set.insert(tuple);
let tuple = build_integers(vec![None, None, None, Some(1), Some(3), Some(5)]);
expected_set.insert(tuple);
let tuple = build_integers(vec![None, None, None, Some(4), Some(6), Some(8)]);
expected_set.insert(tuple);
let tuple = build_integers(vec![None, None, None, Some(1), Some(1), Some(1)]);
expected_set.insert(tuple);

valid_result(&mut expected_set, &tuples);

Ok(())
}
}
103 changes: 49 additions & 54 deletions tests/slt/crdb/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -144,30 +144,28 @@ null null 0 43
0 44 null null
1 null null null

# TODO: Full Join on nested loop join
# query II
# SELECT * FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON false order by b.x
# ----
# 42 NULL
# 44 NULL
# NULL 16
# NULL 42
# NULL 43
# NULL NULL
query II
SELECT a.x, b.x FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON false order by a.x, b.x
----
42 null
44 null
null 16
null 42
null 43
null null

# TODO: Full Join on nested loop join
# query II
# SELECT * FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON true order by b.x
# ----
# 42 16
# 42 42
# 42 43
# 44 16
# 44 42
# 44 43
# NULL 16
# NULL 42
# NULL 43
query II
SELECT a.x, b.x FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON true order by a.x, b.x
----
42 16
42 42
42 43
44 16
44 42
44 43
null 16
null 42
null 43

# TODO: Full Join on nested loop join
# query
Expand Down Expand Up @@ -236,7 +234,6 @@ SELECT * FROM onecolumn AS a(aid, x) RIGHT OUTER JOIN empty AS b(bid, y) ON a.x
statement ok
SELECT * FROM onecolumn AS a RIGHT OUTER JOIN empty AS b USING(x)

# TODO: Full Join on nested loop join
query II
SELECT * FROM empty AS a(aid, x) FULL OUTER JOIN onecolumn AS b(bid, y) ON a.x = b.y ORDER BY b.y
----
Expand Down Expand Up @@ -534,36 +531,34 @@ SELECT * FROM pairs, square WHERE pairs.a + pairs.b = square.sq
# query
# SELECT a, b, n, sq FROM (SELECT a, b, a * b / 2 AS div, n, sq FROM pairs, square) WHERE div = sq

# TODO: Full Join on nested loop join
# query IIII
# SELECT * FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq order by a
# ----
# 1 1 NULL NULL
# 1 2 NULL NULL
# 1 3 2 4
# 1 4 NULL NULL
# 1 5 NULL NULL
# 1 6 NULL NULL
# 2 3 NULL NULL
# 2 4 NULL NULL
# 2 5 NULL NULL
# 2 6 NULL NULL
# 3 4 NULL NULL
# 3 5 NULL NULL
# 3 6 3 9
# 4 5 3 9
# 4 6 NULL NULL
# NULL NULL 1 1
# NULL NULL 4 16
# NULL NULL 5 25
# NULL NULL 6 36
query III
SELECT a, b, n, sq FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq order by a
----
1 1 null null
1 2 null null
1 3 2 4
1 4 null null
1 5 null null
1 6 null null
2 3 null null
2 4 null null
2 5 null null
2 6 null null
3 4 null null
3 5 null null
3 6 3 9
4 5 3 9
4 6 null null
null null 1 1
null null 4 16
null null 5 25
null null 6 36

# TODO: Full Join on nested loop join
# query IIII
# SELECT * FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq WHERE pairs.b%2 <> square.sq%2 order by a
# ----
# 1 3 2 4
# 3 6 3 9
query IIII
SELECT pairs.a, pairs.b, square.* FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq WHERE pairs.b%2 <> square.sq%2 order by a
----
1 3 2 4
3 6 3 9

query IITT rowsort
SELECT * FROM (SELECT * FROM pairs LEFT JOIN square ON b = sq AND a > 1 AND n < 6) WHERE b > 1 AND (n IS NULL OR n > 1) AND (n IS NULL OR a < sq)
Expand Down Expand Up @@ -1088,4 +1083,4 @@ select * from onecolumn as a left join twocolumn as b on a.x = b.x where b.x > 4
# SELECT abcd.*, dxby.* FROM abcd NATURAL FULL OUTER JOIN dxby

# query
# SELECT abcd.*, dxby.* FROM abcd INNER JOIN dxby USING (d, b)
# SELECT abcd.*, dxby.* FROM abcd INNER JOIN dxby USING (d, b)

0 comments on commit 6684d17

Please sign in to comment.