Skip to content

Commit

Permalink
feat: Support Subquery on Where (#136)
Browse files Browse the repository at this point in the history
* feat: support `Subquery` on Where

* fix: add alias for subquery

* fix: parameter checking of subquery in Where
  • Loading branch information
KKould authored Feb 11, 2024
1 parent 5f8b04b commit 86aaf60
Show file tree
Hide file tree
Showing 65 changed files with 768 additions and 425 deletions.
2 changes: 1 addition & 1 deletion src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ fn encode_tuples<'a>(tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
let mut results = Vec::with_capacity(tuples.len());
let schema = Arc::new(
tuples[0]
.columns
.schema_ref
.iter()
.map(|column| {
let pg_type = into_pg_type(column.datatype())?;
Expand Down
8 changes: 6 additions & 2 deletions src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
planner::operator::{aggregate::AggregateOperator, sort::SortField},
};

use super::Binder;
use super::{Binder, QueryBindStep};

impl<'a, T: Transaction> Binder<'a, T> {
pub fn bind_aggregate(
Expand All @@ -20,6 +20,8 @@ impl<'a, T: Transaction> Binder<'a, T> {
agg_calls: Vec<ScalarExpression>,
groupby_exprs: Vec<ScalarExpression>,
) -> LogicalPlan {
self.context.step(QueryBindStep::Agg);

AggregateOperator::build(children, agg_calls, groupby_exprs)
}

Expand Down Expand Up @@ -133,7 +135,8 @@ impl<'a, T: Transaction> Binder<'a, T> {
self.visit_column_agg_expr(expr)?;
}
}
ScalarExpression::Constant(_) | ScalarExpression::ColumnRef { .. } => {}
ScalarExpression::Constant(_) | ScalarExpression::ColumnRef { .. } => (),
ScalarExpression::Empty => unreachable!(),
}

Ok(())
Expand Down Expand Up @@ -306,6 +309,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(())
}
ScalarExpression::Constant(_) => Ok(()),
ScalarExpression::Empty => unreachable!(),
}
}
}
18 changes: 8 additions & 10 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
"illegal column naming".to_string(),
));
}
LogicalPlan {
operator: Operator::AddColumn(AddColumnOperator {
LogicalPlan::new(
Operator::AddColumn(AddColumnOperator {
table_name,
if_not_exists: *if_not_exists,
column,
}),
childrens: vec![plan],
physical_option: None,
}
vec![plan],
)
}
AlterTableOperation::DropColumn {
column_name,
Expand All @@ -53,15 +52,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
let plan = ScanOperator::build(table_name.clone(), table);
let column_name = column_name.value.clone();

LogicalPlan {
operator: Operator::DropColumn(DropColumnOperator {
LogicalPlan::new(
Operator::DropColumn(DropColumnOperator {
table_name,
if_exists: *if_exists,
column_name,
}),
childrens: vec![plan],
physical_option: None,
}
vec![plan],
)
}
AlterTableOperation::DropPrimaryKey => todo!(),
AlterTableOperation::RenameColumn {
Expand Down
14 changes: 6 additions & 8 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ impl<'a, T: Transaction> Binder<'a, T> {

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let columns = table_catalog
.columns_with_id()
.filter_map(|(_, column)| column.desc.is_index().then_some(column.clone()))
.columns()
.filter_map(|column| column.desc.is_index().then_some(column.clone()))
.collect_vec();

let scan_op = ScanOperator::build(table_name.clone(), table_catalog);
let plan = LogicalPlan {
operator: Operator::Analyze(AnalyzeOperator {
Ok(LogicalPlan::new(
Operator::Analyze(AnalyzeOperator {
table_name,
columns,
}),
childrens: vec![scan_op],
physical_option: None,
};
Ok(plan)
vec![scan_op],
))
}
}
22 changes: 10 additions & 12 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
};

if let Some(table) = self.context.table(Arc::new(table_name.to_string())) {
let columns = table.clone_columns();
let schema_ref = table.schema_ref().clone();
let ext_source = ExtSource {
path: match target {
CopyTarget::File { filename } => filename.into(),
Expand All @@ -83,22 +83,20 @@ impl<'a, T: Transaction> Binder<'a, T> {

if to {
// COPY <source_table> TO <dest_file>
Ok(LogicalPlan {
operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
childrens: vec![],
physical_option: None,
})
Ok(LogicalPlan::new(
Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
vec![],
))
} else {
// COPY <dest_table> FROM <source_file>
Ok(LogicalPlan {
operator: Operator::CopyFromFile(CopyFromFileOperator {
Ok(LogicalPlan::new(
Operator::CopyFromFile(CopyFromFileOperator {
source: ext_source,
columns,
schema_ref,
table: table_name.to_string(),
}),
childrens: vec![],
physical_option: None,
})
vec![],
))
}
} else {
Err(DatabaseError::InvalidTable(format!(
Expand Down
9 changes: 4 additions & 5 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
));
}

let plan = LogicalPlan {
operator: Operator::CreateTable(CreateTableOperator {
let plan = LogicalPlan::new(
Operator::CreateTable(CreateTableOperator {
table_name,
columns,
if_not_exists,
}),
childrens: vec![],
physical_option: None,
};
vec![],
);
Ok(plan)
}

Expand Down
15 changes: 7 additions & 8 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ impl<'a, T: Transaction> Binder<'a, T> {

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let primary_key_column = table_catalog
.columns_with_id()
.find(|(_, column)| column.desc.is_primary)
.map(|(_, column)| Arc::clone(column))
.columns()
.find(|column| column.desc.is_primary)
.cloned()
.unwrap();
let mut plan = ScanOperator::build(table_name.clone(), table_catalog);

Expand All @@ -34,14 +34,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
plan = self.bind_where(plan, predicate)?;
}

Ok(LogicalPlan {
operator: Operator::Delete(DeleteOperator {
Ok(LogicalPlan::new(
Operator::Delete(DeleteOperator {
table_name,
primary_key_column,
}),
childrens: vec![plan],
physical_option: None,
})
vec![plan],
))
} else {
unreachable!("only table")
}
Expand Down
9 changes: 4 additions & 5 deletions src/binder/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

Ok(LogicalPlan {
operator: Operator::Describe(DescribeOperator { table_name }),
childrens: vec![],
physical_option: None,
})
Ok(LogicalPlan::new(
Operator::Describe(DescribeOperator { table_name }),
vec![],
))
}
}
4 changes: 3 additions & 1 deletion src/binder/distinct.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::Binder;
use crate::binder::{Binder, QueryBindStep};
use crate::expression::ScalarExpression;
use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::LogicalPlan;
Expand All @@ -10,6 +10,8 @@ impl<'a, T: Transaction> Binder<'a, T> {
children: LogicalPlan,
select_list: Vec<ScalarExpression>,
) -> LogicalPlan {
self.context.step(QueryBindStep::Distinct);

AggregateOperator::build(children, vec![], select_list)
}
}
9 changes: 4 additions & 5 deletions src/binder/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let plan = LogicalPlan {
operator: Operator::DropTable(DropTableOperator {
let plan = LogicalPlan::new(
Operator::DropTable(DropTableOperator {
table_name,
if_exists: *if_exists,
}),
childrens: vec![],
physical_option: None,
};
vec![],
);
Ok(plan)
}
}
6 changes: 1 addition & 5 deletions src/binder/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
Ok(LogicalPlan {
operator: Operator::Explain,
childrens: vec![plan],
physical_option: None,
})
Ok(LogicalPlan::new(Operator::Explain, vec![plan]))
}
}
28 changes: 26 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::catalog::ColumnCatalog;
use crate::errors::DatabaseError;
use crate::expression;
use crate::expression::agg::AggKind;
Expand All @@ -9,7 +10,7 @@ use std::slice;
use std::sync::Arc;

use super::{lower_ident, Binder};
use crate::expression::ScalarExpression;
use crate::expression::{AliasType, ScalarExpression};
use crate::storage::Transaction;
use crate::types::value::DataValue;
use crate::types::LogicalType;
Expand All @@ -19,7 +20,7 @@ macro_rules! try_alias {
if let Some(expr) = $context.expr_aliases.get(&$column_name) {
return Ok(ScalarExpression::Alias {
expr: Box::new(expr.clone()),
alias: $column_name,
alias: AliasType::Name($column_name),
});
}
};
Expand Down Expand Up @@ -89,6 +90,29 @@ impl<'a, T: Transaction> Binder<'a, T> {
from_expr,
})
}
Expr::Subquery(query) => {
let mut sub_query = self.bind_query(query)?;
let sub_query_schema = sub_query.output_schema();

if sub_query_schema.len() != 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned".to_string(),
"the expression returned by the subquery".to_string(),
));
}
let column = sub_query_schema[0].clone();
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_table_name(self.context.temp_table());

self.context.sub_query(sub_query);

Ok(ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
alias_column,
)))),
})
}
_ => {
todo!()
}
Expand Down
Loading

0 comments on commit 86aaf60

Please sign in to comment.