Skip to content

Commit

Permalink
feat: add support for nested loop join (#164)
Browse files Browse the repository at this point in the history
* add support for nested loop join

* fix:keep the order of tuple and schema

* fix:right join not matched case

* fix: expression ownership in the temp table of subquery

* fix: bind error of the same fields in different tables in `in subquery`

* fix: combine_operators.rs deletes the temp table alias of subquery

* test: add `F041_08.slt`/`sqlite.slt`/`subquery.slt`

---------

Co-authored-by: Kould <[email protected]>
  • Loading branch information
crwen and KKould authored Mar 19, 2024
1 parent b8d5416 commit f44ec4f
Show file tree
Hide file tree
Showing 15 changed files with 907 additions and 101 deletions.
35 changes: 21 additions & 14 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,34 +110,35 @@ impl<'a, T: Transaction> Binder<'a, T> {
}),
Expr::Subquery(subquery) => {
let (sub_query, column) = self.bind_subquery(subquery)?;
self.context.sub_query(SubQueryType::SubQuery(sub_query));

if self.context.is_step(&QueryBindStep::Where) {
Ok(self.bind_temp_column(column))
let (expr, sub_query) = if !self.context.is_step(&QueryBindStep::Where) {
self.bind_temp_table(column, sub_query)?
} else {
Ok(ScalarExpression::ColumnRef(column))
}
(ScalarExpression::ColumnRef(column), sub_query)
};
self.context.sub_query(SubQueryType::SubQuery(sub_query));
Ok(expr)
}
Expr::InSubquery {
expr,
subquery,
negated,
} => {
let left_expr = Box::new(self.bind_expr(expr)?);
let (sub_query, column) = self.bind_subquery(subquery)?;
self.context
.sub_query(SubQueryType::InSubQuery(*negated, sub_query));

if !self.context.is_step(&QueryBindStep::Where) {
return Err(DatabaseError::UnsupportedStmt(
"'IN (SUBQUERY)' can only appear in `WHERE`".to_string(),
));
}

let alias_expr = self.bind_temp_column(column);
let (alias_expr, sub_query) = self.bind_temp_table(column, sub_query)?;
self.context
.sub_query(SubQueryType::InSubQuery(*negated, sub_query));

Ok(ScalarExpression::Binary {
op: expression::BinaryOperator::Eq,
left_expr: Box::new(self.bind_expr(expr)?),
left_expr,
right_expr: Box::new(alias_expr),
ty: LogicalType::Boolean,
})
Expand Down Expand Up @@ -203,16 +204,22 @@ impl<'a, T: Transaction> Binder<'a, T> {
}
}

fn bind_temp_column(&mut self, column: ColumnRef) -> ScalarExpression {
fn bind_temp_table(
&mut self,
column: ColumnRef,
sub_query: LogicalPlan,
) -> Result<(ScalarExpression, LogicalPlan), DatabaseError> {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_table_name(self.context.temp_table());

ScalarExpression::Alias {
let alias_expr = ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
alias_column,
)))),
}
};
let alias_plan = self.bind_project(sub_query, vec![alias_expr.clone()])?;
Ok((alias_expr, alias_plan))
}

fn bind_subquery(
Expand Down Expand Up @@ -289,7 +296,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
} else {
// handle col syntax
let mut got_column = None;
for table_catalog in self.context.bind_table.values() {
for table_catalog in self.context.bind_table.values().rev() {
if let Some(column_catalog) = table_catalog.get_column_by_name(&column_name) {
got_column = Some(column_catalog);
}
Expand Down
15 changes: 12 additions & 3 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
// Resolve scalar function call.
// TODO support SRF(Set-Returning Function).

let mut select_list = self.normalize_select_item(&select.projection)?;
let mut select_list = self.normalize_select_item(&select.projection, &plan)?;

if let Some(predicate) = &select.selection {
plan = self.bind_where(plan, predicate)?;
Expand Down Expand Up @@ -341,6 +341,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
fn normalize_select_item(
&mut self,
items: &[SelectItem],
plan: &LogicalPlan,
) -> Result<Vec<ScalarExpression>, DatabaseError> {
let mut select_items = vec![];

Expand All @@ -359,6 +360,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
});
}
SelectItem::Wildcard(_) => {
if let Operator::Project(op) = &plan.operator {
return Ok(op.exprs.clone());
}
for (table_name, _) in self.context.bind_table.keys() {
self.bind_table_column_refs(&mut select_items, table_name.clone())?;
}
Expand Down Expand Up @@ -510,7 +514,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(FilterOperator::build(having, children, true))
}

fn bind_project(
pub(crate) fn bind_project(
&mut self,
children: LogicalPlan,
select_list: Vec<ScalarExpression>,
Expand Down Expand Up @@ -791,7 +795,12 @@ impl<'a, T: Transaction> Binder<'a, T> {
)?;
}
BinaryOperator::Or => {
todo!("`NestLoopJoin` is not supported yet")
accum_filter.push(ScalarExpression::Binary {
left_expr,
right_expr,
op,
ty,
});
}
_ => {
if left_expr.referenced_columns(true).iter().all(|column| {
Expand Down
1 change: 1 addition & 0 deletions src/execution/volcano/dql/join/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::planner::operator::join::JoinType;

pub(crate) mod hash_join;
pub(crate) mod nested_loop_join;

pub fn joins_nullable(join_type: &JoinType) -> (bool, bool) {
match join_type {
Expand Down
Loading

0 comments on commit f44ec4f

Please sign in to comment.