Skip to content

Commit

Permalink
fix: Check HashJoin exists on keys and CNF and DNF inference (#149
Browse files Browse the repository at this point in the history
)

* invalid: `NestLoopJoin` is currently not supported, so when Join exists Or, the corresponding exception is thrown.

* perf: when `And` has multiple `Eq`, the effect of `Dummy` is returned, and the cases of `Dummy` and `Unbound` are distinguished: `EmptyVec` that originally represented `Unbound` is replaced by `ScopeUnbound`, and `EmptyVec` represents `Dummy`

invalid: when the index is `PrimaryKey`, the cost will not multiply by 2.

* fix: checkout HashJoin's eq conditions and fix the problem of missing on conditions when multi-layer Join ...

- invalid: when there is no equivalent condition in HashJoin, an exception is thrown
- fix: the problem of missing on conditions when multi-layer Join
- fix: being classified as an `And` condition when `And` and `Or` exist
- perf: `And` when multiple `Eq` conditions exist at the same time, return `Dummy`
- fix: When the range in `Or` is `Unbound`, `Dummy` is returned.

* docs: remove Incorrect Tips

* version update

* fix: the problem of incorrect `ConstantBinary::and_scope_aggregation` when `And` exists in `Or` and Ignore `ConstantBinary::Scope` whose lower bound is larger than the upper bound and When the same NotEq and Eq exist in or, convert to Unbound
  • Loading branch information
KKould authored Mar 2, 2024
1 parent 4f44d62 commit c9b76f1
Show file tree
Hide file tree
Showing 19 changed files with 293 additions and 204 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.1-alpha.11"
version = "0.0.1-alpha.11.fix2"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "Fast Insert OLTP SQL DBMS"
Expand Down
84 changes: 26 additions & 58 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::planner::operator::sort::{SortField, SortOperator};
use crate::planner::operator::union::UnionOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Schema;
use crate::types::tuple::{Schema, SchemaRef};
use crate::types::LogicalType;
use itertools::Itertools;
use sqlparser::ast::{
Expand Down Expand Up @@ -232,36 +232,24 @@ impl<'a, T: Transaction> Binder<'a, T> {
}

let TableWithJoins { relation, joins } = &from[0];
let mut plan = self.bind_single_table_ref(relation, None)?;

let (left_name, mut plan) = self.bind_single_table_ref(relation, None)?;

if !joins.is_empty() {
let left_name = Self::unpack_name(left_name, true);

for join in joins {
plan = self.bind_join(left_name.clone(), plan, join)?;
}
for join in joins {
plan = self.bind_join(plan, join)?;
}
Ok(plan)
}

fn unpack_name(table_name: Option<TableName>, is_left: bool) -> TableName {
let title = if is_left { "Left" } else { "Right" };
table_name.unwrap_or_else(|| panic!("{}: Table is not named", title))
}

fn bind_single_table_ref(
&mut self,
table: &TableFactor,
joint_type: Option<JoinType>,
) -> Result<(Option<TableName>, LogicalPlan), DatabaseError> {
let plan_with_name = match table {
) -> Result<LogicalPlan, DatabaseError> {
let plan = match table {
TableFactor::Table { name, alias, .. } => {
let table_name = lower_case_name(name)?;

let (table, plan) =
self._bind_single_table_ref(joint_type, &table_name, alias.as_ref())?;
(Some(table), plan)
self._bind_single_table_ref(joint_type, &table_name, alias.as_ref())?
}
TableFactor::Derived {
subquery, alias, ..
Expand All @@ -284,16 +272,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
table_alias.to_string(),
tables.pop().unwrap(),
)?;

(Some(table_alias), plan)
} else {
((tables.len() > 1).then(|| tables.pop()).flatten(), plan)
}
plan
}
_ => unimplemented!(),
};

Ok(plan_with_name)
Ok(plan)
}

fn register_alias(
Expand Down Expand Up @@ -332,20 +317,17 @@ impl<'a, T: Transaction> Binder<'a, T> {
join_type: Option<JoinType>,
table: &str,
alias: Option<&TableAlias>,
) -> Result<(Arc<String>, LogicalPlan), DatabaseError> {
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(table.to_string());

let table_catalog = self.context.table_and_bind(table_name.clone(), join_type)?;
let scan_op = ScanOperator::build(table_name.clone(), table_catalog);

if let Some(TableAlias { name, columns }) = alias {
let alias = lower_ident(name);
self.register_alias(columns, alias.clone(), table_name.clone())?;

return Ok((Arc::new(alias), scan_op));
self.register_alias(columns, lower_ident(name), table_name.clone())?;
}

Ok((table_name, scan_op))
Ok(scan_op)
}

/// Normalize select item.
Expand Down Expand Up @@ -428,8 +410,7 @@ impl<'a, T: Transaction> Binder<'a, T> {

fn bind_join(
&mut self,
left_table: TableName,
left: LogicalPlan,
mut left: LogicalPlan,
join: &Join,
) -> Result<LogicalPlan, DatabaseError> {
let Join {
Expand All @@ -445,11 +426,12 @@ impl<'a, T: Transaction> Binder<'a, T> {
JoinOperator::CrossJoin => (JoinType::Cross, None),
_ => unimplemented!(),
};
let (right_table, right) = self.bind_single_table_ref(relation, Some(join_type))?;
let right_table = Self::unpack_name(right_table, false);
let mut right = self.bind_single_table_ref(relation, Some(join_type))?;

let on = match joint_condition {
Some(constraint) => self.bind_join_constraint(left_table, right_table, constraint)?,
Some(constraint) => {
self.bind_join_constraint(left.output_schema(), right.output_schema(), constraint)?
}
None => JoinCondition::None,
};

Expand Down Expand Up @@ -639,8 +621,8 @@ impl<'a, T: Transaction> Binder<'a, T> {

fn bind_join_constraint(
&mut self,
left_table: TableName,
right_table: TableName,
left_schema: &SchemaRef,
right_schema: &SchemaRef,
constraint: &JoinConstraint,
) -> Result<JoinCondition, DatabaseError> {
match constraint {
Expand All @@ -651,21 +633,12 @@ impl<'a, T: Transaction> Binder<'a, T> {
let mut filter = vec![];
let expr = self.bind_expr(expr)?;

let left_table = self
.context
.table(left_table)
.ok_or(DatabaseError::TableNotFound)?;
let right_table = self
.context
.table(right_table)
.ok_or(DatabaseError::TableNotFound)?;

Self::extract_join_keys(
expr,
&mut on_keys,
&mut filter,
left_table.schema_ref(),
right_table.schema_ref(),
left_schema,
right_schema,
)?;

// combine multiple filter exprs into one BinaryExpr
Expand All @@ -691,19 +664,11 @@ impl<'a, T: Transaction> Binder<'a, T> {
.find(|column| column.name() == lower_ident(ident))
.map(|column| ScalarExpression::ColumnRef(column.clone()))
};
let left_table = self
.context
.table(left_table)
.ok_or(DatabaseError::TableNotFound)?;
let right_table = self
.context
.table(right_table)
.ok_or(DatabaseError::TableNotFound)?;

for ident in idents {
if let (Some(left_column), Some(right_column)) = (
fn_column(left_table.schema_ref(), ident),
fn_column(right_table.schema_ref(), ident),
fn_column(left_schema, ident),
fn_column(right_schema, ident),
) {
on_keys.push((left_column, right_column));
} else {
Expand Down Expand Up @@ -823,6 +788,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
right_schema,
)?;
}
BinaryOperator::Or => {
todo!("`NestLoopJoin` is not supported yet")
}
_ => {
if left_expr.referenced_columns(true).iter().all(|column| {
fn_or_contains(left_schema, right_schema, column.summary())
Expand Down
8 changes: 4 additions & 4 deletions src/execution/volcano/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use futures_async_stream::try_stream;
pub(crate) struct IndexScan {
op: ScanOperator,
index_by: IndexMetaRef,
binaries: Vec<ConstantBinary>,
ranges: Vec<ConstantBinary>,
}

impl From<(ScanOperator, IndexMetaRef, Vec<ConstantBinary>)> for IndexScan {
fn from((op, index_by, binaries): (ScanOperator, IndexMetaRef, Vec<ConstantBinary>)) -> Self {
fn from((op, index_by, ranges): (ScanOperator, IndexMetaRef, Vec<ConstantBinary>)) -> Self {
IndexScan {
op,
index_by,
binaries,
ranges,
}
}
}
Expand All @@ -39,7 +39,7 @@ impl IndexScan {
..
} = self.op;
let mut iter =
transaction.read_by_index(table_name, limit, columns, self.index_by, self.binaries)?;
transaction.read_by_index(table_name, limit, columns, self.index_by, self.ranges)?;

while let Some(tuple) = iter.next_tuple()? {
yield tuple;
Expand Down
5 changes: 5 additions & 0 deletions src/execution/volcano/dql/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl HashJoinStatus {
JoinCondition::On { on, filter } => (on.into_iter().unzip(), filter),
JoinCondition::None => unreachable!("HashJoin must has on condition"),
};
if on_left_keys.is_empty() || on_right_keys.is_empty() {
todo!("`NestLoopJoin` should be used when there is no equivalent condition")
}
assert!(!on_left_keys.is_empty());
assert!(!on_right_keys.is_empty());

let fn_process = |schema: &mut Vec<ColumnRef>, force_nullable| {
for column in schema.iter_mut() {
Expand Down
4 changes: 2 additions & 2 deletions src/execution/volcano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ pub fn build_read<T: Transaction>(plan: LogicalPlan, transaction: &T) -> BoxedEx
Operator::Scan(op) => {
if let Some(PhysicalOption::IndexScan(IndexInfo {
meta,
binaries: Some(binaries),
ranges: Some(ranges),
})) = plan.physical_option
{
IndexScan::from((op, meta, binaries)).execute(transaction)
IndexScan::from((op, meta, ranges)).execute(transaction)
} else {
SeqScan::from(op).execute(transaction)
}
Expand Down
Loading

0 comments on commit c9b76f1

Please sign in to comment.