Skip to content

Commit

Permalink
fix: In multi-level nested subquery, the field relationships of diffe…
Browse files Browse the repository at this point in the history
…rent tables are wrong
  • Loading branch information
KKould committed Mar 20, 2024
1 parent 917804e commit 9073845
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 14 deletions.
7 changes: 6 additions & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ mod tests {
use crate::storage::kip::KipStorage;
use crate::storage::Storage;
use crate::types::LogicalType;
use std::sync::atomic::AtomicUsize;
use tempfile::TempDir;

#[tokio::test]
Expand All @@ -152,7 +153,11 @@ mod tests {
let functions = Default::default();

let sql = "create table t1 (id int primary key, name varchar(10) null)";
let mut binder = Binder::new(BinderContext::new(&transaction, &functions));
let mut binder = Binder::new(BinderContext::new(
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
));
let stmt = crate::parser::parse_sql(sql).unwrap();
let plan1 = binder.bind(&stmt[0]).unwrap();

Expand Down
17 changes: 15 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sqlparser::ast::{
use std::slice;
use std::sync::Arc;

use super::{lower_ident, Binder, QueryBindStep, SubQueryType};
use super::{lower_ident, Binder, BinderContext, QueryBindStep, SubQueryType};
use crate::expression::function::{FunctionSummary, ScalarFunction};
use crate::expression::{AliasType, ScalarExpression};
use crate::planner::LogicalPlan;
Expand Down Expand Up @@ -226,7 +226,19 @@ impl<'a, T: Transaction> Binder<'a, T> {
&mut self,
subquery: &Query,
) -> Result<(LogicalPlan, Arc<ColumnCatalog>), DatabaseError> {
let mut sub_query = self.bind_query(subquery)?;
let BinderContext {
transaction,
functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(BinderContext::new(
*transaction,
functions,
temp_table_id.clone(),
));

let mut sub_query = binder.bind_query(subquery)?;
let sub_query_schema = sub_query.output_schema();

if sub_query_schema.len() != 1 {
Expand All @@ -236,6 +248,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
));
}
let column = sub_query_schema[0].clone();
self.context.merge_context(binder.context)?;
Ok((sub_query, column))
}

Expand Down
53 changes: 46 additions & 7 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod update;

use sqlparser::ast::{Ident, ObjectName, ObjectType, SetExpr, Statement};
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crate::catalog::{TableCatalog, TableName};
Expand Down Expand Up @@ -55,7 +56,7 @@ pub enum SubQueryType {

#[derive(Clone)]
pub struct BinderContext<'a, T: Transaction> {
functions: &'a Functions,
pub(crate) functions: &'a Functions,
pub(crate) transaction: &'a T,
// Tips: When there are multiple tables and Wildcard, use BTreeMap to ensure that the order of the output tables is certain.
pub(crate) bind_table: BTreeMap<(TableName, Option<JoinType>), &'a TableCatalog>,
Expand All @@ -69,12 +70,16 @@ pub struct BinderContext<'a, T: Transaction> {
bind_step: QueryBindStep,
sub_queries: HashMap<QueryBindStep, Vec<SubQueryType>>,

temp_table_id: usize,
temp_table_id: Arc<AtomicUsize>,
pub(crate) allow_default: bool,
}

impl<'a, T: Transaction> BinderContext<'a, T> {
pub fn new(transaction: &'a T, functions: &'a Functions) -> Self {
pub fn new(
transaction: &'a T,
functions: &'a Functions,
temp_table_id: Arc<AtomicUsize>,
) -> Self {
BinderContext {
functions,
transaction,
Expand All @@ -85,14 +90,16 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
agg_calls: Default::default(),
bind_step: QueryBindStep::From,
sub_queries: Default::default(),
temp_table_id: 0,
temp_table_id,
allow_default: false,
}
}

pub fn temp_table(&mut self) -> TableName {
self.temp_table_id += 1;
Arc::new(format!("_temp_table_{}_", self.temp_table_id))
Arc::new(format!(
"_temp_table_{}_",
self.temp_table_id.fetch_add(1, Ordering::SeqCst)
))
}

pub fn step(&mut self, bind_step: QueryBindStep) {
Expand Down Expand Up @@ -163,6 +170,33 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
pub fn has_agg_call(&self, expr: &ScalarExpression) -> bool {
self.group_by_exprs.contains(expr)
}

pub fn merge_context(&mut self, context: BinderContext<'a, T>) -> Result<(), DatabaseError> {
let BinderContext {
expr_aliases,
table_aliases,
bind_table,
..
} = context;

for (alias, expr) in expr_aliases {
if self.expr_aliases.contains_key(&alias) {
return Err(DatabaseError::DuplicateAliasExpr(alias));
}
self.expr_aliases.insert(alias, expr);
}
for (alias, table_name) in table_aliases {
if self.table_aliases.contains_key(&alias) {
return Err(DatabaseError::DuplicateAliasTable(alias));
}
self.table_aliases.insert(alias, table_name);
}
for (key, table) in bind_table {
self.bind_table.insert(key, table);
}

Ok(())
}
}

pub struct Binder<'a, T: Transaction> {
Expand Down Expand Up @@ -305,6 +339,7 @@ pub mod test {
use crate::storage::{Storage, Transaction};
use crate::types::LogicalType::Integer;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tempfile::TempDir;

Expand Down Expand Up @@ -358,7 +393,11 @@ pub mod test {
let storage = build_test_catalog(temp_dir.path()).await?;
let transaction = storage.transaction().await?;
let functions = Default::default();
let mut binder = Binder::new(BinderContext::new(&transaction, &functions));
let mut binder = Binder::new(BinderContext::new(
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
));
let stmt = crate::parser::parse_sql(sql)?;

Ok(binder.bind(&stmt[0])?)
Expand Down
7 changes: 6 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use ahash::HashMap;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use crate::binder::{Binder, BinderContext};
Expand Down Expand Up @@ -98,7 +99,11 @@ impl<S: Storage> Database<S> {
if stmts.is_empty() {
return Err(DatabaseError::EmptyStatement);
}
let mut binder = Binder::new(BinderContext::new(transaction, functions));
let mut binder = Binder::new(BinderContext::new(
transaction,
functions,
Arc::new(AtomicUsize::new(0)),
));
/// Build a logical plan.
///
/// SELECT a,b FROM t1 ORDER BY a LIMIT 1;
Expand Down
8 changes: 6 additions & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ pub enum DatabaseError {
#[source]
csv::Error,
),
#[error("duplicate primary key")]
DuplicatePrimaryKey,
#[error("alias expr: {0} already exists")]
DuplicateAliasExpr(String),
#[error("alias table: {0} already exists")]
DuplicateAliasTable(String),
#[error("column: {0} already exists")]
DuplicateColumn(String),
#[error("index: {0} already exists")]
DuplicateIndex(String),
#[error("duplicate primary key")]
DuplicatePrimaryKey,
#[error("the column has been declared unique and the value already exists")]
DuplicateUniqueValue,
#[error("empty plan")]
Expand Down
7 changes: 6 additions & 1 deletion src/optimizer/core/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ mod tests {
use crate::types::LogicalType;
use petgraph::stable_graph::NodeIndex;
use std::ops::Bound;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tempfile::TempDir;

Expand All @@ -121,7 +122,11 @@ mod tests {

let transaction = database.storage.transaction().await?;
let functions = Default::default();
let mut binder = Binder::new(BinderContext::new(&transaction, &functions));
let mut binder = Binder::new(BinderContext::new(
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
));
let stmt = crate::parser::parse_sql(
// FIXME: Only by bracketing (c1 > 40 or c1 = 2) can the filter be pushed down below the join
"select c1, c3 from t1 inner join t2 on c1 = c3 where (c1 > 40 or c1 = 2) and c3 > 22",
Expand Down

0 comments on commit 9073845

Please sign in to comment.