Skip to content

Commit

Permalink
feat(batch): lookup join supports lookup inner side table's prefix of…
Browse files Browse the repository at this point in the history
… the order key (risingwavelabs#6588)

* lookup join support lookup pk prefix

* fix dashboard

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
chenzl25 and mergify[bot] authored Nov 29, 2022
1 parent c2464af commit 7ece244
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 48 deletions.
28 changes: 28 additions & 0 deletions dashboard/proto/gen/batch_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,36 @@ drop table t1;
statement ok
drop table t2;

statement ok
create table t1(a int, b int);

statement ok
create table t2(c int, d int);

statement ok
create index idx on t2(c) include(d);

statement ok
insert into t1 values (1,222);

statement ok
insert into t2 values (1,222);

query IIII
select * from t1 join idx on t1.a = idx.c;
----
1 222 1 222

query IIII
select * from t1 join idx on t1.a = idx.c and t1.b = idx.d;
----
1 222 1 222

statement ok
drop table t1;

statement ok
drop table t2;

statement ok
set rw_batch_enable_lookup_join to false;
24 changes: 14 additions & 10 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,16 @@ message LocalLookupJoinNode {
plan_common.JoinType join_type = 1;
expr.ExprNode condition = 2;
repeated uint32 outer_side_key = 3;
plan_common.StorageTableDesc inner_side_table_desc = 4;
repeated uint32 inner_side_vnode_mapping = 5;
repeated int32 inner_side_column_ids = 6;
repeated uint32 output_indices = 7;
repeated common.WorkerNode worker_nodes = 8;
repeated uint32 inner_side_key = 4;
uint32 lookup_prefix_len = 5;
plan_common.StorageTableDesc inner_side_table_desc = 6;
repeated uint32 inner_side_vnode_mapping = 7;
repeated int32 inner_side_column_ids = 8;
repeated uint32 output_indices = 9;
repeated common.WorkerNode worker_nodes = 10;
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 9;
repeated bool null_safe = 11;
}

// RFC: A new schedule way for distributed lookup join
Expand All @@ -233,12 +235,14 @@ message DistributedLookupJoinNode {
plan_common.JoinType join_type = 1;
expr.ExprNode condition = 2;
repeated uint32 outer_side_key = 3;
plan_common.StorageTableDesc inner_side_table_desc = 4;
repeated int32 inner_side_column_ids = 5;
repeated uint32 output_indices = 6;
repeated uint32 inner_side_key = 4;
uint32 lookup_prefix_len = 5;
plan_common.StorageTableDesc inner_side_table_desc = 6;
repeated int32 inner_side_column_ids = 7;
repeated uint32 output_indices = 8;
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 7;
repeated bool null_safe = 9;
}

message UnionNode {}
Expand Down
62 changes: 44 additions & 18 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
use std::marker::PhantomData;
use std::mem::swap;

use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId, TableOption};
use risingwave_common::error::{internal_error, Result};
use risingwave_common::error::Result;
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Datum};
Expand All @@ -31,7 +32,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::expr::expr_node::Type;
use risingwave_pb::plan_common::OrderType as ProstOrderType;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::Distribution;
use risingwave_storage::table::{Distribution, TableIter};
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::executor::join::JoinType;
Expand Down Expand Up @@ -152,15 +153,12 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.map(|&i| outer_side_data_types[i].clone())
.collect_vec();

let lookup_prefix_len: usize =
distributed_lookup_join_node.get_lookup_prefix_len() as usize;

let mut inner_side_key_idxs = vec![];
for pk in &table_desc.pk {
let key_idx = inner_side_column_ids
.iter()
.position(|&i| table_desc.columns[pk.index as usize].column_id == i)
.ok_or_else(|| {
internal_error("Inner side key is not part of its output columns")
})?;
inner_side_key_idxs.push(key_idx);
for inner_side_key in distributed_lookup_join_node.get_inner_side_key() {
inner_side_key_idxs.push(*inner_side_key as usize)
}

let inner_side_key_types = inner_side_key_idxs
Expand Down Expand Up @@ -232,6 +230,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
let inner_side_builder = InnerSideExecutorBuilder::new(
outer_side_key_types,
inner_side_key_types.clone(),
lookup_prefix_len,
source.epoch(),
vec![],
table,
Expand All @@ -248,6 +247,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
inner_side_key_types,
inner_side_key_idxs,
null_safe,
lookup_prefix_len,
chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
schema: actual_schema,
output_indices,
Expand All @@ -269,6 +269,7 @@ struct DistributedLookupJoinExecutorArgs {
inner_side_key_types: Vec<DataType>,
inner_side_key_idxs: Vec<usize>,
null_safe: Vec<bool>,
lookup_prefix_len: usize,
chunk_builder: DataChunkBuilder,
schema: Schema,
output_indices: Vec<usize>,
Expand All @@ -291,6 +292,7 @@ impl HashKeyDispatcher for DistributedLookupJoinExecutorArgs {
inner_side_key_types: self.inner_side_key_types,
inner_side_key_idxs: self.inner_side_key_idxs,
null_safe: self.null_safe,
lookup_prefix_len: self.lookup_prefix_len,
chunk_builder: self.chunk_builder,
schema: self.schema,
output_indices: self.output_indices,
Expand All @@ -310,6 +312,7 @@ impl HashKeyDispatcher for DistributedLookupJoinExecutorArgs {
struct InnerSideExecutorBuilder<S: StateStore> {
outer_side_key_types: Vec<DataType>,
inner_side_key_types: Vec<DataType>,
lookup_prefix_len: usize,
epoch: u64,
row_list: Vec<Row>,
table: StorageTable<S>,
Expand All @@ -320,6 +323,7 @@ impl<S: StateStore> InnerSideExecutorBuilder<S> {
fn new(
outer_side_key_types: Vec<DataType>,
inner_side_key_types: Vec<DataType>,
lookup_prefix_len: usize,
epoch: u64,
row_list: Vec<Row>,
table: StorageTable<S>,
Expand All @@ -328,6 +332,7 @@ impl<S: StateStore> InnerSideExecutorBuilder<S> {
Self {
outer_side_key_types,
inner_side_key_types,
lookup_prefix_len,
epoch,
row_list,
table,
Expand All @@ -348,8 +353,16 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {

for ((datum, outer_type), inner_type) in key_datums
.into_iter()
.zip_eq(self.outer_side_key_types.iter())
.zip_eq(self.inner_side_key_types.iter())
.zip_eq(
self.outer_side_key_types
.iter()
.take(self.lookup_prefix_len),
)
.zip_eq(
self.inner_side_key_types
.iter()
.take(self.lookup_prefix_len),
)
{
let datum = if inner_type == outer_type {
datum
Expand All @@ -367,13 +380,26 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
}

let pk_prefix = Row::new(scan_range.eq_conds);
let row = self
.table
.get_row(&pk_prefix, HummockReadEpoch::Committed(self.epoch))
.await?;

if let Some(row) = row {
self.row_list.push(row);
if self.lookup_prefix_len == self.table.pk_indices().len() {
let row = self
.table
.get_row(&pk_prefix, HummockReadEpoch::Committed(self.epoch))
.await?;

if let Some(row) = row {
self.row_list.push(row);
}
} else {
let iter = self
.table
.batch_iter_with_pk_bounds(HummockReadEpoch::Committed(self.epoch), &pk_prefix, ..)
.await?;

pin_mut!(iter);
while let Some(row) = iter.next_row().await? {
self.row_list.push(row);
}
}

Ok(())
Expand Down
Loading

0 comments on commit 7ece244

Please sign in to comment.