Skip to content

Commit

Permalink
Support Subquery on WHERE with IN/Not IN (#147)
Browse files Browse the repository at this point in the history
* invalid: fix index's `tuple_ids` -> `tuple_id`

* refactor: remove Tuple‘s Schema

* fix: `TestFunction`

* feat: support `LeftSemi` & `LeftAnti`

* feat: support Subquery on `WHERE` with `IN/Not IN`

* style: remove `Transaction::drop_column`'s `if_not_exists`

* test: more case for `insubquery`

* code fmt
  • Loading branch information
KKould authored Feb 28, 2024
1 parent e8fc5cd commit 5778712
Show file tree
Hide file tree
Showing 60 changed files with 796 additions and 1,008 deletions.
9 changes: 4 additions & 5 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fnck_sql::db::DataBaseBuilder;
use fnck_sql::errors::DatabaseError;
use fnck_sql::implement_from_tuple;
use fnck_sql::types::tuple::Tuple;
use fnck_sql::types::tuple::{SchemaRef, Tuple};
use fnck_sql::types::value::DataValue;
use fnck_sql::types::LogicalType;
use itertools::Itertools;
Expand Down Expand Up @@ -38,11 +38,10 @@ async fn main() -> Result<(), DatabaseError> {
let _ = database
.run("insert into my_struct values(0, 0), (1, 1)")
.await?;
let tuples = database
.run("select * from my_struct")
.await?
let (schema, tuples) = database.run("select * from my_struct").await?;
let tuples = tuples
.into_iter()
.map(MyStruct::from)
.map(|tuple| MyStruct::from((&schema, tuple)))
.collect_vec();

println!("{:#?}", tuples);
Expand Down
11 changes: 5 additions & 6 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::Parser;
use fnck_sql::db::{DBTransaction, DataBaseBuilder, Database};
use fnck_sql::errors::DatabaseError;
use fnck_sql::storage::kip::KipStorage;
use fnck_sql::types::tuple::Tuple;
use fnck_sql::types::tuple::{Schema, Tuple};
use fnck_sql::types::LogicalType;
use futures::stream;
use log::{error, info, LevelFilter};
Expand Down Expand Up @@ -146,28 +146,27 @@ impl SimpleQueryHandler for SessionBackend {
_ => {
let mut guard = self.tx.lock().await;

let tuples = if let Some(transaction) = guard.as_mut() {
let (schema, tuples) = if let Some(transaction) = guard.as_mut() {
transaction.run(query).await
} else {
self.inner.run(query).await
}
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;

Ok(vec![Response::Query(encode_tuples(tuples)?)])
Ok(vec![Response::Query(encode_tuples(&schema, tuples)?)])
}
}
}
}

fn encode_tuples<'a>(tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
fn encode_tuples<'a>(schema: &Schema, tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
if tuples.is_empty() {
return Ok(QueryResponse::new(Arc::new(vec![]), stream::empty()));
}

let mut results = Vec::with_capacity(tuples.len());
let schema = Arc::new(
tuples[0]
.schema_ref
schema
.iter()
.map(|column| {
let pg_type = into_pg_type(column.datatype())?;
Expand Down
2 changes: 1 addition & 1 deletion src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
return Ok(());
}
if matches!(expr, ScalarExpression::Alias { .. }) {
return self.validate_having_orderby(expr.unpack_alias());
return self.validate_having_orderby(expr.unpack_alias_ref());
}

Err(DatabaseError::AggMiss(
Expand Down
86 changes: 62 additions & 24 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::catalog::ColumnCatalog;
use crate::catalog::{ColumnCatalog, ColumnRef};
use crate::errors::DatabaseError;
use crate::expression;
use crate::expression::agg::AggKind;
use itertools::Itertools;
use sqlparser::ast::{
BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, UnaryOperator,
BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, Query,
UnaryOperator,
};
use std::slice;
use std::sync::Arc;

use super::{lower_ident, Binder, QueryBindStep};
use super::{lower_ident, Binder, QueryBindStep, SubQueryType};
use crate::expression::function::{FunctionSummary, ScalarFunction};
use crate::expression::{AliasType, ScalarExpression};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::value::DataValue;
use crate::types::LogicalType;
Expand Down Expand Up @@ -99,33 +101,40 @@ impl<'a, T: Transaction> Binder<'a, T> {
from_expr,
})
}
Expr::Subquery(query) => {
let mut sub_query = self.bind_query(query)?;
let sub_query_schema = sub_query.output_schema();

if sub_query_schema.len() != 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned",
"the expression returned by the subquery",
));
}
let column = sub_query_schema[0].clone();
self.context.sub_query(sub_query);
Expr::Subquery(subquery) => {
let (sub_query, column) = self.bind_subquery(subquery)?;
self.context.sub_query(SubQueryType::SubQuery(sub_query));

if self.context.is_step(&QueryBindStep::Where) {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_table_name(self.context.temp_table());

Ok(ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
alias_column,
)))),
})
Ok(self.bind_temp_column(column))
} else {
Ok(ScalarExpression::ColumnRef(column))
}
}
Expr::InSubquery {
expr,
subquery,
negated,
} => {
let (sub_query, column) = self.bind_subquery(subquery)?;
self.context
.sub_query(SubQueryType::InSubQuery(*negated, sub_query));

if !self.context.is_step(&QueryBindStep::Where) {
return Err(DatabaseError::UnsupportedStmt(
"`in subquery` can only appear in `Where`".to_string(),
));
}

let alias_expr = self.bind_temp_column(column);

Ok(ScalarExpression::Binary {
op: expression::BinaryOperator::Eq,
left_expr: Box::new(self.bind_expr(expr)?),
right_expr: Box::new(alias_expr),
ty: LogicalType::Boolean,
})
}
Expr::Tuple(exprs) => {
let mut bond_exprs = Vec::with_capacity(exprs.len());

Expand Down Expand Up @@ -187,6 +196,35 @@ impl<'a, T: Transaction> Binder<'a, T> {
}
}

fn bind_temp_column(&mut self, column: ColumnRef) -> ScalarExpression {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_table_name(self.context.temp_table());

ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
alias_column,
)))),
}
}

fn bind_subquery(
&mut self,
subquery: &Query,
) -> Result<(LogicalPlan, Arc<ColumnCatalog>), DatabaseError> {
let mut sub_query = self.bind_query(subquery)?;
let sub_query_schema = sub_query.output_schema();

if sub_query_schema.len() != 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned",
"the expression returned by the subquery",
));
}
let column = sub_query_schema[0].clone();
Ok((sub_query, column))
}

pub fn bind_like(
&mut self,
negated: bool,
Expand Down
16 changes: 13 additions & 3 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub enum QueryBindStep {
Limit,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum SubQueryType {
SubQuery(LogicalPlan),
InSubQuery(bool, LogicalPlan),
}

#[derive(Clone)]
pub struct BinderContext<'a, T: Transaction> {
functions: &'a Functions,
Expand All @@ -60,7 +66,7 @@ pub struct BinderContext<'a, T: Transaction> {
pub(crate) agg_calls: Vec<ScalarExpression>,

bind_step: QueryBindStep,
sub_queries: HashMap<QueryBindStep, Vec<LogicalPlan>>,
sub_queries: HashMap<QueryBindStep, Vec<SubQueryType>>,

temp_table_id: usize,
pub(crate) allow_default: bool,
Expand Down Expand Up @@ -96,14 +102,18 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
&self.bind_step == bind_step
}

pub fn sub_query(&mut self, sub_query: LogicalPlan) {
pub fn step_now(&self) -> QueryBindStep {
self.bind_step
}

pub fn sub_query(&mut self, sub_query: SubQueryType) {
self.sub_queries
.entry(self.bind_step)
.or_default()
.push(sub_query)
}

pub fn sub_queries_at_now(&mut self) -> Option<Vec<LogicalPlan>> {
pub fn sub_queries_at_now(&mut self) -> Option<Vec<SubQueryType>> {
self.sub_queries.remove(&self.bind_step)
}

Expand Down
Loading

0 comments on commit 5778712

Please sign in to comment.