Skip to content

Commit

Permalink
feat: support Union (#139)
Browse files Browse the repository at this point in the history
* feat: support `Union`

* docs: add `DataValue::Tuple` and `DQL::Union`
  • Loading branch information
KKould authored Feb 14, 2024
1 parent d1d5202 commit 0270f6b
Show file tree
Hide file tree
Showing 20 changed files with 360 additions and 51 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ implement_from_tuple!(
- [x] Show Tables
- [x] Explain
- [x] Describe
- [x] Union
- DML
- [x] Insert
- [x] Insert Overwrite
Expand All @@ -162,6 +163,7 @@ implement_from_tuple!(
- Varchar
- Date
- DateTime
- Tuple

## Roadmap
- SQL 2016
Expand Down
4 changes: 2 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ impl<'a, T: Transaction> Binder<'a, T> {

if sub_query_schema.len() != 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned".to_string(),
"the expression returned by the subquery".to_string(),
"expects only one expression to be returned",
"the expression returned by the subquery",
));
}
let column = sub_query_schema[0].clone();
Expand Down
14 changes: 14 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ impl<'a, T: Transaction> Binder<'a, T> {
};
Ok(plan)
}

pub fn bind_set_expr(&mut self, set_expr: &SetExpr) -> Result<LogicalPlan, DatabaseError> {
match set_expr {
SetExpr::Select(select) => self.bind_select(select, &[]),
SetExpr::Query(query) => self.bind_query(query),
SetExpr::SetOperation {
op,
set_quantifier,
left,
right,
} => self.bind_set_operation(op, set_quantifier, left, right),
_ => todo!(),
}
}
}

fn lower_ident(ident: &Ident) -> String {
Expand Down
102 changes: 95 additions & 7 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use crate::execution::volcano::dql::join::joins_nullable;
use crate::expression::{AliasType, BinaryOperator};
use crate::planner::operator::join::JoinCondition;
use crate::planner::operator::sort::{SortField, SortOperator};
use crate::planner::operator::union::UnionOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Schema;
use crate::types::tuple::{Schema, SchemaRef};
use crate::types::LogicalType;
use itertools::Itertools;
use sqlparser::ast::{
Distinct, Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select,
SelectItem, SetExpr, TableAlias, TableFactor, TableWithJoins,
SelectItem, SetExpr, SetOperator, SetQuantifier, TableAlias, TableFactor, TableWithJoins,
};

impl<'a, T: Transaction> Binder<'a, T> {
Expand All @@ -41,6 +42,12 @@ impl<'a, T: Transaction> Binder<'a, T> {
let mut plan = match query.body.borrow() {
SetExpr::Select(select) => self.bind_select(select, &query.order_by),
SetExpr::Query(query) => self.bind_query(query),
SetExpr::SetOperation {
op,
set_quantifier,
left,
right,
} => self.bind_set_operation(op, set_quantifier, left, right),
_ => unimplemented!(),
}?;

Expand All @@ -54,7 +61,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(plan)
}

fn bind_select(
pub(crate) fn bind_select(
&mut self,
select: &Select,
orderby: &[OrderByExpr],
Expand Down Expand Up @@ -107,6 +114,90 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(plan)
}

pub(crate) fn bind_set_operation(
&mut self,
op: &SetOperator,
set_quantifier: &SetQuantifier,
left: &SetExpr,
right: &SetExpr,
) -> Result<LogicalPlan, DatabaseError> {
let is_all = match set_quantifier {
SetQuantifier::All => true,
SetQuantifier::Distinct | SetQuantifier::None => false,
};
let mut left_plan = self.bind_set_expr(left)?;
let mut right_plan = self.bind_set_expr(right)?;
let fn_eq = |left_schema: &SchemaRef, right_schema: &SchemaRef| {
let left_len = left_schema.len();

if left_len != right_schema.len() {
return false;
}
for i in 0..left_len {
if left_schema[i].datatype() != right_schema[i].datatype() {
return false;
}
}
true
};
match (op, is_all) {
(SetOperator::Union, true) => {
let left_schema = left_plan.output_schema();
let right_schema = right_plan.output_schema();

if !fn_eq(left_schema, right_schema) {
return Err(DatabaseError::MisMatch(
"the output types on the left",
"the output types on the right",
));
}
Ok(UnionOperator::build(
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
}
(SetOperator::Union, false) => {
let left_schema = left_plan.output_schema();
let right_schema = right_plan.output_schema();

if !fn_eq(left_schema, right_schema) {
return Err(DatabaseError::MisMatch(
"the output types on the left",
"the output types on the right",
));
}
let union_op = Operator::Union(UnionOperator {
left_schema_ref: left_schema.clone(),
right_schema_ref: right_schema.clone(),
});
let distinct_exprs = left_schema
.iter()
.cloned()
.map(ScalarExpression::ColumnRef)
.collect_vec();

Ok(self.bind_distinct(
LogicalPlan::new(union_op, vec![left_plan, right_plan]),
distinct_exprs,
))
}
(SetOperator::Intersect, true) => {
todo!()
}
(SetOperator::Intersect, false) => {
todo!()
}
(SetOperator::Except, true) => {
todo!()
}
(SetOperator::Except, false) => {
todo!()
}
}
}

pub(crate) fn bind_table_ref(
&mut self,
from: &[TableWithJoins],
Expand Down Expand Up @@ -192,10 +283,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
.ok_or(DatabaseError::TableNotFound)?;

if alias_column.len() != table.columns_len() {
return Err(DatabaseError::MisMatch(
"Alias".to_string(),
"Columns".to_string(),
));
return Err(DatabaseError::MisMatch("alias", "columns"));
}
let aliases_with_columns = alias_column
.iter()
Expand Down
7 changes: 7 additions & 0 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ impl ColumnCatalog {
&self.summary.name
}

pub fn full_name(&self) -> String {
if let Some(table_name) = self.table_name() {
return format!("{}.{}", table_name, self.name());
}
self.name().to_string()
}

pub fn table_name(&self) -> Option<&TableName> {
self.summary.table_name.as_ref()
}
Expand Down
2 changes: 1 addition & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub enum DatabaseError {
FromUtf8Error,
),
#[error("{0} and {1} do not match")]
MisMatch(String, String),
MisMatch(&'static str, &'static str),
#[error("io: {0}")]
IO(
#[source]
Expand Down
1 change: 1 addition & 0 deletions src/execution/volcano/dql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod projection;
pub(crate) mod seq_scan;
pub(crate) mod show_table;
pub(crate) mod sort;
pub(crate) mod union;
pub(crate) mod values;

#[cfg(test)]
Expand Down
45 changes: 45 additions & 0 deletions src/execution/volcano/dql/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::errors::DatabaseError;
use crate::execution::volcano::{build_read, BoxedExecutor, ReadExecutor};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;

pub struct Union {
left_input: LogicalPlan,
right_input: LogicalPlan,
}

impl From<(LogicalPlan, LogicalPlan)> for Union {
fn from((left_input, right_input): (LogicalPlan, LogicalPlan)) -> Self {
Union {
left_input,
right_input,
}
}
}

impl<T: Transaction> ReadExecutor<T> for Union {
fn execute(self, transaction: &T) -> BoxedExecutor {
self._execute(transaction)
}
}

impl Union {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let Union {
left_input,
right_input,
} = self;

#[for_await]
for tuple in build_read(left_input, transaction) {
yield tuple?;
}
#[for_await]
for tuple in build_read(right_input, transaction) {
yield tuple?;
}
}
}
7 changes: 7 additions & 0 deletions src/execution/volcano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::execution::volcano::dql::projection::Projection;
use crate::execution::volcano::dql::seq_scan::SeqScan;
use crate::execution::volcano::dql::show_table::ShowTables;
use crate::execution::volcano::dql::sort::Sort;
use crate::execution::volcano::dql::union::Union;
use crate::execution::volcano::dql::values::Values;
use crate::planner::operator::{Operator, PhysicalOption};
use crate::planner::LogicalPlan;
Expand Down Expand Up @@ -109,6 +110,12 @@ pub fn build_read<T: Transaction>(plan: LogicalPlan, transaction: &T) -> BoxedEx
Explain::from(input).execute(transaction)
}
Operator::Describe(op) => Describe::from(op).execute(transaction),
Operator::Union(_) => {
let left_input = childrens.remove(0);
let right_input = childrens.remove(0);

Union::from((left_input, right_input)).execute(transaction)
}
_ => unreachable!(),
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,7 @@ impl ScalarExpression {
pub fn output_name(&self) -> String {
match self {
ScalarExpression::Constant(value) => format!("{}", value),
ScalarExpression::ColumnRef(col) => {
if let Some(table_name) = col.table_name() {
return format!("{}.{}", table_name, col.name());
}
col.name().to_string()
}
ScalarExpression::ColumnRef(col) => col.full_name(),
ScalarExpression::Alias { alias, expr } => match alias {
AliasType::Name(alias) => alias.to_string(),
AliasType::Expr(alias_expr) => {
Expand Down
6 changes: 5 additions & 1 deletion src/optimizer/rule/normalization/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ impl ColumnPruning {
.retain(|(_, column)| column_references.contains(column.summary()));
}
}
Operator::Sort(_) | Operator::Limit(_) | Operator::Join(_) | Operator::Filter(_) => {
Operator::Sort(_)
| Operator::Limit(_)
| Operator::Join(_)
| Operator::Filter(_)
| Operator::Union(_) => {
let temp_columns = operator.referenced_columns(false);
// why?
let mut column_references = column_references;
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/rule/normalization/expression_remapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl ExpressionRemapper {
| Operator::DropTable(_)
| Operator::Truncate(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_) => (),
| Operator::CopyToFile(_)
| Operator::Union(_) => (),
}
if let Some(exprs) = operator.output_exprs() {
*output_exprs = exprs;
Expand Down
12 changes: 11 additions & 1 deletion src/planner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod operator;

use crate::catalog::TableName;
use crate::planner::operator::union::UnionOperator;
use crate::planner::operator::values::ValuesOperator;
use crate::planner::operator::{Operator, PhysicalOption};
use crate::types::tuple::SchemaRef;
use itertools::Itertools;
Expand Down Expand Up @@ -83,7 +85,15 @@ impl LogicalPlan {
.collect_vec();
Arc::new(out_columns)
}
Operator::Values(op) => op.schema_ref.clone(),
Operator::Values(ValuesOperator { schema_ref, .. }) => schema_ref.clone(),
Operator::Union(UnionOperator {
left_schema_ref,
right_schema_ref,
}) => {
let mut schema = Vec::clone(left_schema_ref);
schema.extend_from_slice(right_schema_ref.as_slice());
Arc::new(schema)
}
Operator::Dummy
| Operator::Show
| Operator::Explain
Expand Down
Loading

0 comments on commit 0270f6b

Please sign in to comment.