Skip to content

Commit

Permalink
feat: support more join case
Browse files Browse the repository at this point in the history
- reconstruct the alias mechanism to ensure correctness during Join
- support Natural Join
- support multiple from(to Cross Join)
- when Using and Natural Join, the same columns are automatically removed (Fixme: JoinType needs to be supported to decide whether to use the columns of the left table or the right table)
- `RangeDetacher::detach` removes meaningless `Result`
  • Loading branch information
KKould committed Apr 10, 2024
1 parent 74ea950 commit 7c0b71f
Show file tree
Hide file tree
Showing 15 changed files with 1,380 additions and 184 deletions.
4 changes: 3 additions & 1 deletion src/binder/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let table_catalog = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let index_metas = table_catalog.indexes.clone();

let scan_op = ScanOperator::build(table_name.clone(), table_catalog);
Expand Down
4 changes: 3 additions & 1 deletion src/binder/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
IndexType::Composite
};

let table = self.context.table_and_bind(table_name.clone(), None)?;
let table = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let plan = ScanOperator::build(table_name.clone(), table);
let mut columns = Vec::with_capacity(exprs.len());

Expand Down
18 changes: 13 additions & 5 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::planner::operator::scan::ScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use sqlparser::ast::{Expr, TableFactor, TableWithJoins};
use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins};
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
Expand All @@ -16,18 +16,26 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, DatabaseError> {
if let TableFactor::Table { name, alias, .. } = &from.relation {
let table_name = Arc::new(lower_case_name(name)?);
let mut table_alias = None;
let mut alias_idents = None;

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
if let Some(TableAlias { name, columns }) = alias {
table_alias = Some(Arc::new(name.value.to_lowercase()));
alias_idents = Some(columns);
}
let table_catalog =
self.context
.table_and_bind(table_name.clone(), table_alias.clone(), None)?;
let primary_key_column = table_catalog
.columns()
.find(|column| column.desc.is_primary)
.cloned()
.unwrap();
let mut plan = ScanOperator::build(table_name.clone(), table_catalog);

if let Some(alias) = alias {
self.context
.add_table_alias(alias.to_string(), table_name.clone());
if let Some(alias_idents) = alias_idents {
plan =
self.bind_alias(plan, alias_idents, table_alias.unwrap(), table_name.clone())?;
}

if let Some(predicate) = selection {
Expand Down
45 changes: 29 additions & 16 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use crate::types::value::{DataValue, Utf8Type};
use crate::types::LogicalType;

macro_rules! try_alias {
($context:expr, $column_name:expr) => {
if let Some(expr) = $context.expr_aliases.get(&$column_name) {
($context:expr, $full_name:expr) => {
if let Some(expr) = $context.expr_aliases.get(&$full_name) {
return Ok(ScalarExpression::Alias {
expr: Box::new(expr.clone()),
alias: AliasType::Name($column_name),
alias: AliasType::Name($full_name.1),
});
}
};
Expand Down Expand Up @@ -283,7 +283,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
idents: &[Ident],
bind_table_name: Option<String>,
) -> Result<ScalarExpression, DatabaseError> {
let (table_name, column_name) = match idents {
let full_name = match idents {
[column] => (None, lower_ident(column)),
[table, column] => (Some(lower_ident(table)), lower_ident(column)),
_ => {
Expand All @@ -296,25 +296,40 @@ impl<'a, T: Transaction> Binder<'a, T> {
))
}
};
try_alias!(self.context, column_name);
try_alias!(self.context, full_name);
if self.context.allow_default {
try_default!(&table_name, column_name);
try_default!(&full_name.0, full_name.1);
}
if let Some(table) = table_name.or(bind_table_name) {
if let Some(table) = full_name.0.or(bind_table_name) {
let table_catalog = self.context.bind_table(&table, self.parent)?;

let column_catalog = table_catalog
.get_column_by_name(&column_name)
.ok_or_else(|| DatabaseError::NotFound("column", column_name))?;
.get_column_by_name(&full_name.1)
.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?;
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
} else {
let op = |got_column: &mut Option<&'a ColumnRef>, context: &BinderContext<'a, T>| {
for table_catalog in context.bind_table.values() {
let op = |got_column: &mut Option<ScalarExpression>, context: &BinderContext<'a, T>| {
for ((_, alias, _), table_catalog) in context.bind_table.iter() {
if got_column.is_some() {
break;
}
if let Some(column_catalog) = table_catalog.get_column_by_name(&column_name) {
*got_column = Some(column_catalog);
if let Some(alias) = alias {
*got_column = self.context.expr_aliases.iter().find_map(
|((alias_table, alias_column), expr)| {
matches!(
alias_table
.as_ref()
.map(|table_name| table_name == alias.as_ref()
&& alias_column == &full_name.1),
Some(true)
)
.then(|| expr.clone())
},
);
} else if let Some(column_catalog) =
table_catalog.get_column_by_name(&full_name.1)
{
*got_column = Some(ScalarExpression::ColumnRef(column_catalog.clone()));
}
}
};
Expand All @@ -325,9 +340,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
if let Some(parent) = self.parent {
op(&mut got_column, &parent.context);
}
let column_catalog =
got_column.ok_or_else(|| DatabaseError::NotFound("column", column_name))?;
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
Ok(got_column.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?)
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
self.context.allow_default = true;
let table_name = Arc::new(lower_case_name(name)?);

let table = self.context.table_and_bind(table_name.clone(), None)?;
let table = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let mut _schema_ref = None;
let values_len = expr_rows[0].len();

Expand Down
36 changes: 26 additions & 10 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod show;
mod truncate;
mod update;

use ahash::HashSet;
use sqlparser::ast::{Ident, ObjectName, ObjectType, SetExpr, Statement};
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -85,13 +86,16 @@ pub struct BinderContext<'a, T: Transaction> {
pub(crate) functions: &'a Functions,
pub(crate) transaction: &'a T,
// Tips: When there are multiple tables and Wildcard, use BTreeMap to ensure that the order of the output tables is certain.
pub(crate) bind_table: BTreeMap<(TableName, Option<JoinType>), &'a TableCatalog>,
pub(crate) bind_table:
BTreeMap<(TableName, Option<TableName>, Option<JoinType>), &'a TableCatalog>,
// alias
expr_aliases: HashMap<String, ScalarExpression>,
table_aliases: HashMap<String, TableName>,
expr_aliases: BTreeMap<(Option<String>, String), ScalarExpression>,
table_aliases: HashMap<TableName, TableName>,
// agg
group_by_exprs: Vec<ScalarExpression>,
pub(crate) agg_calls: Vec<ScalarExpression>,
// join
using: HashSet<String>,

bind_step: QueryBindStep,
sub_queries: HashMap<QueryBindStep, Vec<SubQueryType>>,
Expand All @@ -114,6 +118,7 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
table_aliases: Default::default(),
group_by_exprs: vec![],
agg_calls: Default::default(),
using: Default::default(),
bind_step: QueryBindStep::From,
sub_queries: Default::default(),
temp_table_id,
Expand Down Expand Up @@ -162,6 +167,7 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
pub fn table_and_bind(
&mut self,
table_name: TableName,
alias: Option<TableName>,
join_type: Option<JoinType>,
) -> Result<&TableCatalog, DatabaseError> {
let table = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) {
Expand All @@ -172,7 +178,7 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
.ok_or(DatabaseError::TableNotFound)?;

self.bind_table
.insert((table_name.clone(), join_type), table);
.insert((table_name.clone(), alias, join_type), table);

Ok(table)
}
Expand All @@ -183,9 +189,10 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
table_name: &str,
parent: Option<&'a Binder<'a, T>>,
) -> Result<&TableCatalog, DatabaseError> {
let default_name = Arc::new(table_name.to_owned());
let real_name = self.table_aliases.get(table_name).unwrap_or(&default_name);
if let Some(table_catalog) = self.bind_table.iter().find(|((t, _), _)| t == real_name) {
if let Some(table_catalog) = self.bind_table.iter().find(|((t, alias, _), _)| {
t.as_str() == table_name
|| matches!(alias.as_ref().map(|a| a.as_str() == table_name), Some(true))
}) {
Ok(table_catalog.1)
} else if let Some(binder) = parent {
binder.context.bind_table(table_name, binder.parent)
Expand All @@ -202,11 +209,20 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
}
}

pub fn add_alias(&mut self, alias: String, expr: ScalarExpression) {
self.expr_aliases.insert(alias, expr);
pub fn add_using(&mut self, name: String) {
self.using.insert(name);
}

pub fn add_table_alias(&mut self, alias: String, table: TableName) {
pub fn add_alias(
&mut self,
alias_table: Option<String>,
alias_column: String,
expr: ScalarExpression,
) {
self.expr_aliases.insert((alias_table, alias_column), expr);
}

pub fn add_table_alias(&mut self, alias: TableName, table: TableName) {
self.table_aliases.insert(alias.clone(), table.clone());
}

Expand Down
Loading

0 comments on commit 7c0b71f

Please sign in to comment.