Skip to content

Commit

Permalink
feat: support Subquery on Where
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Feb 11, 2024
1 parent 5f8b04b commit d96b2c3
Show file tree
Hide file tree
Showing 61 changed files with 583 additions and 370 deletions.
2 changes: 1 addition & 1 deletion src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ fn encode_tuples<'a>(tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
let mut results = Vec::with_capacity(tuples.len());
let schema = Arc::new(
tuples[0]
.columns
.schema_ref
.iter()
.map(|column| {
let pg_type = into_pg_type(column.datatype())?;
Expand Down
8 changes: 6 additions & 2 deletions src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
planner::operator::{aggregate::AggregateOperator, sort::SortField},
};

use super::Binder;
use super::{Binder, QueryBindStep};

impl<'a, T: Transaction> Binder<'a, T> {
pub fn bind_aggregate(
Expand All @@ -20,6 +20,8 @@ impl<'a, T: Transaction> Binder<'a, T> {
agg_calls: Vec<ScalarExpression>,
groupby_exprs: Vec<ScalarExpression>,
) -> LogicalPlan {
self.context.step(QueryBindStep::Agg);

AggregateOperator::build(children, agg_calls, groupby_exprs)
}

Expand Down Expand Up @@ -133,7 +135,8 @@ impl<'a, T: Transaction> Binder<'a, T> {
self.visit_column_agg_expr(expr)?;
}
}
ScalarExpression::Constant(_) | ScalarExpression::ColumnRef { .. } => {}
ScalarExpression::Constant(_) | ScalarExpression::ColumnRef { .. } => (),
ScalarExpression::Empty => unreachable!(),
}

Ok(())
Expand Down Expand Up @@ -306,6 +309,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(())
}
ScalarExpression::Constant(_) => Ok(()),
ScalarExpression::Empty => unreachable!(),
}
}
}
18 changes: 8 additions & 10 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
"illegal column naming".to_string(),
));
}
LogicalPlan {
operator: Operator::AddColumn(AddColumnOperator {
LogicalPlan::new(
Operator::AddColumn(AddColumnOperator {
table_name,
if_not_exists: *if_not_exists,
column,
}),
childrens: vec![plan],
physical_option: None,
}
vec![plan],
)
}
AlterTableOperation::DropColumn {
column_name,
Expand All @@ -53,15 +52,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
let plan = ScanOperator::build(table_name.clone(), table);
let column_name = column_name.value.clone();

LogicalPlan {
operator: Operator::DropColumn(DropColumnOperator {
LogicalPlan::new(
Operator::DropColumn(DropColumnOperator {
table_name,
if_exists: *if_exists,
column_name,
}),
childrens: vec![plan],
physical_option: None,
}
vec![plan],
)
}
AlterTableOperation::DropPrimaryKey => todo!(),
AlterTableOperation::RenameColumn {
Expand Down
14 changes: 6 additions & 8 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ impl<'a, T: Transaction> Binder<'a, T> {

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let columns = table_catalog
.columns_with_id()
.filter_map(|(_, column)| column.desc.is_index().then_some(column.clone()))
.columns()
.filter_map(|column| column.desc.is_index().then_some(column.clone()))
.collect_vec();

let scan_op = ScanOperator::build(table_name.clone(), table_catalog);
let plan = LogicalPlan {
operator: Operator::Analyze(AnalyzeOperator {
Ok(LogicalPlan::new(
Operator::Analyze(AnalyzeOperator {
table_name,
columns,
}),
childrens: vec![scan_op],
physical_option: None,
};
Ok(plan)
vec![scan_op],
))
}
}
22 changes: 10 additions & 12 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
};

if let Some(table) = self.context.table(Arc::new(table_name.to_string())) {
let columns = table.clone_columns();
let schema_ref = table.schema_ref().clone();
let ext_source = ExtSource {
path: match target {
CopyTarget::File { filename } => filename.into(),
Expand All @@ -83,22 +83,20 @@ impl<'a, T: Transaction> Binder<'a, T> {

if to {
// COPY <source_table> TO <dest_file>
Ok(LogicalPlan {
operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
childrens: vec![],
physical_option: None,
})
Ok(LogicalPlan::new(
Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
vec![],
))
} else {
// COPY <dest_table> FROM <source_file>
Ok(LogicalPlan {
operator: Operator::CopyFromFile(CopyFromFileOperator {
Ok(LogicalPlan::new(
Operator::CopyFromFile(CopyFromFileOperator {
source: ext_source,
columns,
schema_ref,
table: table_name.to_string(),
}),
childrens: vec![],
physical_option: None,
})
vec![],
))
}
} else {
Err(DatabaseError::InvalidTable(format!(
Expand Down
9 changes: 4 additions & 5 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
));
}

let plan = LogicalPlan {
operator: Operator::CreateTable(CreateTableOperator {
let plan = LogicalPlan::new(
Operator::CreateTable(CreateTableOperator {
table_name,
columns,
if_not_exists,
}),
childrens: vec![],
physical_option: None,
};
vec![],
);
Ok(plan)
}

Expand Down
15 changes: 7 additions & 8 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ impl<'a, T: Transaction> Binder<'a, T> {

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let primary_key_column = table_catalog
.columns_with_id()
.find(|(_, column)| column.desc.is_primary)
.map(|(_, column)| Arc::clone(column))
.columns()
.find(|column| column.desc.is_primary)
.cloned()
.unwrap();
let mut plan = ScanOperator::build(table_name.clone(), table_catalog);

Expand All @@ -34,14 +34,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
plan = self.bind_where(plan, predicate)?;
}

Ok(LogicalPlan {
operator: Operator::Delete(DeleteOperator {
Ok(LogicalPlan::new(
Operator::Delete(DeleteOperator {
table_name,
primary_key_column,
}),
childrens: vec![plan],
physical_option: None,
})
vec![plan],
))
} else {
unreachable!("only table")
}
Expand Down
9 changes: 4 additions & 5 deletions src/binder/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

Ok(LogicalPlan {
operator: Operator::Describe(DescribeOperator { table_name }),
childrens: vec![],
physical_option: None,
})
Ok(LogicalPlan::new(
Operator::Describe(DescribeOperator { table_name }),
vec![],
))
}
}
4 changes: 3 additions & 1 deletion src/binder/distinct.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::Binder;
use crate::binder::{Binder, QueryBindStep};
use crate::expression::ScalarExpression;
use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::LogicalPlan;
Expand All @@ -10,6 +10,8 @@ impl<'a, T: Transaction> Binder<'a, T> {
children: LogicalPlan,
select_list: Vec<ScalarExpression>,
) -> LogicalPlan {
self.context.step(QueryBindStep::Distinct);

AggregateOperator::build(children, vec![], select_list)
}
}
9 changes: 4 additions & 5 deletions src/binder/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let plan = LogicalPlan {
operator: Operator::DropTable(DropTableOperator {
let plan = LogicalPlan::new(
Operator::DropTable(DropTableOperator {
table_name,
if_exists: *if_exists,
}),
childrens: vec![],
physical_option: None,
};
vec![],
);
Ok(plan)
}
}
6 changes: 1 addition & 5 deletions src/binder/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
Ok(LogicalPlan {
operator: Operator::Explain,
childrens: vec![plan],
physical_option: None,
})
Ok(LogicalPlan::new(Operator::Explain, vec![plan]))
}
}
15 changes: 15 additions & 0 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ impl<'a, T: Transaction> Binder<'a, T> {
from_expr,
})
}
Expr::Subquery(query) => {
let mut sub_query = self.bind_query(query)?;
let sub_query_schema = sub_query.out_schmea();

if sub_query_schema.len() > 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned".to_string(),
"the expression returned by the subquery".to_string(),
));
}
let expr = ScalarExpression::ColumnRef(sub_query_schema[0].clone());
self.context.sub_query(sub_query);

Ok(expr)
}
_ => {
todo!()
}
Expand Down
46 changes: 26 additions & 20 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::catalog::ColumnRef;
use crate::errors::DatabaseError;
use crate::expression::value_compute::unary_op;
use crate::expression::ScalarExpression;
Expand All @@ -8,6 +7,7 @@ use crate::planner::operator::values::ValuesOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::SchemaRef;
use crate::types::value::{DataValue, ValueRef};
use sqlparser::ast::{Expr, Ident, ObjectName};
use std::slice;
Expand All @@ -24,15 +24,20 @@ impl<'a, T: Transaction> Binder<'a, T> {
let table_name = Arc::new(lower_case_name(name)?);

if let Some(table) = self.context.table(table_name.clone()) {
let mut columns = Vec::new();
let mut _schema_ref = None;
let values_len = expr_rows[0].len();

if idents.is_empty() {
columns = table.clone_columns();
if values_len > columns.len() {
return Err(DatabaseError::ValuesLenMismatch(columns.len(), values_len));
let temp_schema_ref = table.schema_ref().clone();
if values_len > temp_schema_ref.len() {
return Err(DatabaseError::ValuesLenMismatch(
temp_schema_ref.len(),
values_len,
));
}
_schema_ref = Some(temp_schema_ref);
} else {
let mut columns = Vec::with_capacity(idents.len());
for ident in idents {
match self.bind_column_ref_from_identifiers(
slice::from_ref(ident),
Expand All @@ -45,7 +50,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
if values_len != columns.len() {
return Err(DatabaseError::ValuesLenMismatch(columns.len(), values_len));
}
_schema_ref = Some(Arc::new(columns));
}
let schema_ref = _schema_ref.ok_or(DatabaseError::ColumnsEmpty)?;
let mut rows = Vec::with_capacity(expr_rows.len());
for expr_row in expr_rows {
if expr_row.len() != values_len {
Expand All @@ -57,14 +64,15 @@ impl<'a, T: Transaction> Binder<'a, T> {
match &self.bind_expr(expr)? {
ScalarExpression::Constant(value) => {
// Check if the value length is too long
value.check_len(columns[i].datatype())?;
let cast_value = DataValue::clone(value).cast(columns[i].datatype())?;
value.check_len(schema_ref[i].datatype())?;
let cast_value =
DataValue::clone(value).cast(schema_ref[i].datatype())?;
row.push(Arc::new(cast_value))
}
ScalarExpression::Unary { expr, op, .. } => {
if let ScalarExpression::Constant(value) = expr.as_ref() {
row.push(Arc::new(
unary_op(value, op)?.cast(columns[i].datatype())?,
unary_op(value, op)?.cast(schema_ref[i].datatype())?,
))
} else {
unreachable!()
Expand All @@ -76,16 +84,15 @@ impl<'a, T: Transaction> Binder<'a, T> {

rows.push(row);
}
let values_plan = self.bind_values(rows, columns);
let values_plan = self.bind_values(rows, schema_ref);

Ok(LogicalPlan {
operator: Operator::Insert(InsertOperator {
Ok(LogicalPlan::new(
Operator::Insert(InsertOperator {
table_name,
is_overwrite,
}),
childrens: vec![values_plan],
physical_option: None,
})
vec![values_plan],
))
} else {
Err(DatabaseError::InvalidTable(format!(
"not found table {}",
Expand All @@ -97,12 +104,11 @@ impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_values(
&mut self,
rows: Vec<Vec<ValueRef>>,
columns: Vec<ColumnRef>,
schema_ref: SchemaRef,
) -> LogicalPlan {
LogicalPlan {
operator: Operator::Values(ValuesOperator { rows, columns }),
childrens: vec![],
physical_option: None,
}
LogicalPlan::new(
Operator::Values(ValuesOperator { rows, schema_ref }),
vec![],
)
}
}
Loading

0 comments on commit d96b2c3

Please sign in to comment.