Skip to content

Commit

Permalink
feat(optimizer): support covered index selection for index join (risi…
Browse files Browse the repository at this point in the history
…ngwavelabs#6645)

support covered index selection for index join

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
chenzl25 and mergify[bot] authored Nov 29, 2022
1 parent 04514b8 commit 8103a13
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 23 deletions.
38 changes: 38 additions & 0 deletions e2e_test/batch/basic/index_join.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (a int, b int);

statement ok
create table t2 (c int, d int);

statement ok
create index idx on t2(d) include (c);

statement ok
create index idx2 on t2(c, d);

statement ok
insert into t values(1, 111), (2, 222);

statement ok
insert into t2 values(1, 111), (2, 222);

query IIII rowsort
select * from t join t2 on t.b = t2.d;
----
1 111 1 111
2 222 2 222

query IIII rowsort
select * from t join t2 on t.a = t2.c and t.b = t2.d;
----
1 111 1 111
2 222 2 222

statement ok
drop table t;

statement ok
drop table t2;
25 changes: 25 additions & 0 deletions src/frontend/planner_test/tests/testdata/batch_index_join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
set rw_batch_enable_lookup_join = true;
create table t (a int, b int);
create table t2 (c int, d int);
create index idx on t2(d) include (c);
select * from t join t2 on t.b = t2.d;
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: t.b = idx.d, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.b) }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
- sql: |
set rw_batch_enable_lookup_join = true;
create table t (a int, b int);
create table t2 (c int, d int);
create index idx on t2(d) include (c);
create index idx2 on t2(c, d);
create index idx3 on t2(c) include(d);
select * from t join t2 on t.a = t2.c and t.b = t2.d;
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: t.a = idx2.c AND t.b = idx2.d, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a, t.b) }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
LogicalFilter { predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) }
└─LogicalTopN { order: "[t.v1 ASC]", limit: 10, offset: 0 }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4] }
# TODO: hop's predicate pushdown https://github.com/risingwavelabs/risingwave/issues/6606
- name: filter hop transpose
sql: |
create table t(v1 int, v2 int, v3 int, v4 int, ts date);
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl BatchLookupJoin {
batch_lookup_join.distributed_lookup = distributed_lookup;
batch_lookup_join
}

pub fn lookup_prefix_len(&self) -> usize {
self.lookup_prefix_len
}
}

impl fmt::Display for BatchLookupJoin {
Expand Down
109 changes: 87 additions & 22 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,83 @@ impl LogicalJoin {
}
}

/// Index Join:
/// Try to convert logical join into batch lookup join and meanwhile it will do
/// the index selection for the lookup table so that we can benefit from indexes.
fn to_batch_lookup_join_with_index_selection(
&self,
predicate: EqJoinPredicate,
logical_join: LogicalJoin,
) -> Option<BatchLookupJoin> {
match logical_join.join_type() {
JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => {}
_ => return None,
};

// Index selection for index join.
let right = self.right();
// Lookup Join only supports basic tables on the join's right side.
let logical_scan: &LogicalScan = right.as_logical_scan()?;

let mut result_plan = None;
// Lookup primary table.
if let Some(lookup_join) =
self.to_batch_lookup_join(predicate.clone(), logical_join.clone())
{
result_plan = Some(lookup_join);
}

let required_col_idx = logical_scan.required_col_idx();
let indexes = logical_scan.indexes();
for index in indexes {
let p2s_mapping = index.primary_to_secondary_mapping();
if required_col_idx.iter().all(|x| p2s_mapping.contains_key(x)) {
// Covering index selection
let index_scan: PlanRef = logical_scan
.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
)
.into();

let that = self.clone_with_left_right(self.left(), index_scan.clone());
let new_logical_join = logical_join.clone_with_left_right(
logical_join.left(),
index_scan.to_batch().expect("index scan failed to batch"),
);

// Lookup covered index.
if let Some(lookup_join) =
that.to_batch_lookup_join(predicate.clone(), new_logical_join)
{
match &result_plan {
None => result_plan = Some(lookup_join),
Some(prev_lookup_join) => {
// Prefer to choose lookup join with longer lookup prefix len.
if prev_lookup_join.lookup_prefix_len()
< lookup_join.lookup_prefix_len()
{
result_plan = Some(lookup_join)
}
}
}
}
}
}

result_plan
}

/// Try to convert logical join into batch lookup join.
fn to_batch_lookup_join(
&self,
predicate: EqJoinPredicate,
logical_join: LogicalJoin,
) -> Option<PlanRef> {
) -> Option<BatchLookupJoin> {
match logical_join.join_type() {
JoinType::RightOuter
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::FullOuter => return None,
_ => {}
JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => {}
_ => return None,
};

let right = self.right();
Expand Down Expand Up @@ -563,17 +629,14 @@ impl LogicalJoin {
new_join_output_indices,
);

Some(
BatchLookupJoin::new(
new_logical_join,
new_predicate,
table_desc,
new_scan_output_column_ids,
lookup_prefix_len,
false,
)
.into(),
)
Some(BatchLookupJoin::new(
new_logical_join,
new_predicate,
table_desc,
new_scan_output_column_ids,
lookup_prefix_len,
false,
))
}

pub fn decompose(self) -> (PlanRef, PlanRef, Condition, JoinType, Vec<usize>) {
Expand Down Expand Up @@ -1015,7 +1078,8 @@ impl LogicalJoin {

Ok(self
.to_batch_lookup_join(predicate, logical_join)
.expect("Fail to convert to lookup join"))
.expect("Fail to convert to lookup join")
.into())
}

fn to_batch_nested_loop_join(
Expand Down Expand Up @@ -1044,10 +1108,11 @@ impl ToBatch for LogicalJoin {

if predicate.has_eq() {
if config.get_batch_enable_lookup_join() {
if let Some(lookup_join) =
self.to_batch_lookup_join(predicate.clone(), logical_join.clone())
{
return Ok(lookup_join);
if let Some(lookup_join) = self.to_batch_lookup_join_with_index_selection(
predicate.clone(),
logical_join.clone(),
) {
return Ok(lookup_join.into());
}
}

Expand Down

0 comments on commit 8103a13

Please sign in to comment.