From 577871238b74c23bf2d5a61e2466b5c98b988030 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Thu, 29 Feb 2024 03:26:28 +0800 Subject: [PATCH] Support `Subquery` on `WHERE` with `IN/Not IN` (#147) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * invalid: fix index's `tuple_ids` -> `tuple_id` * refactor: remove Tuple‘s Schema * fix: `TestFunction` * feat: support `LeftSemi` & `LeftAnti` * feat: support Subquery on `WHERE` with `IN/Not IN` * style: remove `Transaction::drop_column`'s `if_not_exists` * test: more case for `insubquery` * code fmt --- examples/hello_world.rs | 9 +- src/bin/server.rs | 11 +- src/binder/aggregate.rs | 2 +- src/binder/expr.rs | 86 ++-- src/binder/mod.rs | 16 +- src/binder/select.rs | 84 ++-- src/binder/update.rs | 6 +- src/catalog/table.rs | 6 +- src/db.rs | 371 ++---------------- src/execution/volcano/ddl/add_column.rs | 12 +- src/execution/volcano/ddl/create_table.rs | 5 +- src/execution/volcano/ddl/drop_column.rs | 68 ++-- src/execution/volcano/ddl/drop_table.rs | 4 +- src/execution/volcano/ddl/truncate.rs | 3 + src/execution/volcano/dml/analyze.rs | 20 +- src/execution/volcano/dml/copy_from_file.rs | 11 +- src/execution/volcano/dml/insert.rs | 46 +-- src/execution/volcano/dml/update.rs | 20 +- src/execution/volcano/dql/aggregate/avg.rs | 3 + .../volcano/dql/aggregate/hash_agg.rs | 45 ++- .../volcano/dql/aggregate/simple_agg.rs | 34 +- src/execution/volcano/dql/describe.rs | 13 +- src/execution/volcano/dql/dummy.rs | 2 - src/execution/volcano/dql/explain.rs | 8 +- src/execution/volcano/dql/filter.rs | 8 +- src/execution/volcano/dql/join/hash_join.rs | 274 ++++++++----- src/execution/volcano/dql/join/mod.rs | 4 +- src/execution/volcano/dql/projection.rs | 16 +- src/execution/volcano/dql/show_table.rs | 12 +- src/execution/volcano/dql/sort.rs | 10 +- src/execution/volcano/dql/values.rs | 8 +- src/execution/volcano/mod.rs | 4 +- src/expression/evaluator.rs | 70 ++-- src/expression/function.rs | 8 +- src/expression/mod.rs | 12 +- src/marcos/mod.rs | 34 +- .../rule/normalization/pushdown_limit.rs | 6 +- .../rule/normalization/pushdown_predicates.rs | 10 +- src/planner/mod.rs | 78 ++-- src/planner/operator/join.rs | 6 +- src/planner/operator/mod.rs | 25 +- src/planner/operator/union.rs | 27 +- src/planner/operator/values.rs | 9 +- src/storage/kip.rs | 30 +- src/storage/mod.rs | 45 +-- src/storage/table_codec.rs | 35 +- src/types/index.rs | 3 +- src/types/tuple.rs | 43 +- src/types/tuple_builder.rs | 48 +-- tests/slt/sql_2016/E061_11.slt | 9 +- tests/slt/sql_2016/E061_13.slt | 24 +- tests/slt/sql_2016/E091_01.slt | 2 + tests/slt/sql_2016/E091_02.slt | 4 + tests/slt/sql_2016/E091_03.slt | 2 + tests/slt/sql_2016/E091_04.slt | 2 + tests/slt/sql_2016/E091_05.slt | 2 + tests/slt/sql_2016/E091_06.slt | 10 + tests/slt/sql_2016/E091_07.slt | 10 + tests/slt/subquery.slt | 25 ++ tests/sqllogictest/src/lib.rs | 4 +- 60 files changed, 796 insertions(+), 1008 deletions(-) diff --git a/examples/hello_world.rs b/examples/hello_world.rs index ca82fb75..3b3a6795 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -1,7 +1,7 @@ use fnck_sql::db::DataBaseBuilder; use fnck_sql::errors::DatabaseError; use fnck_sql::implement_from_tuple; -use fnck_sql::types::tuple::Tuple; +use fnck_sql::types::tuple::{SchemaRef, Tuple}; use fnck_sql::types::value::DataValue; use fnck_sql::types::LogicalType; use itertools::Itertools; @@ -38,11 +38,10 @@ async fn main() -> Result<(), DatabaseError> { let _ = database .run("insert into my_struct values(0, 0), (1, 1)") .await?; - let tuples = database - .run("select * from my_struct") - .await? + let (schema, tuples) = database.run("select * from my_struct").await?; + let tuples = tuples .into_iter() - .map(MyStruct::from) + .map(|tuple| MyStruct::from((&schema, tuple))) .collect_vec(); println!("{:#?}", tuples); diff --git a/src/bin/server.rs b/src/bin/server.rs index cc23b2eb..1cd70384 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -3,7 +3,7 @@ use clap::Parser; use fnck_sql::db::{DBTransaction, DataBaseBuilder, Database}; use fnck_sql::errors::DatabaseError; use fnck_sql::storage::kip::KipStorage; -use fnck_sql::types::tuple::Tuple; +use fnck_sql::types::tuple::{Schema, Tuple}; use fnck_sql::types::LogicalType; use futures::stream; use log::{error, info, LevelFilter}; @@ -146,28 +146,27 @@ impl SimpleQueryHandler for SessionBackend { _ => { let mut guard = self.tx.lock().await; - let tuples = if let Some(transaction) = guard.as_mut() { + let (schema, tuples) = if let Some(transaction) = guard.as_mut() { transaction.run(query).await } else { self.inner.run(query).await } .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - Ok(vec![Response::Query(encode_tuples(tuples)?)]) + Ok(vec![Response::Query(encode_tuples(&schema, tuples)?)]) } } } } -fn encode_tuples<'a>(tuples: Vec) -> PgWireResult> { +fn encode_tuples<'a>(schema: &Schema, tuples: Vec) -> PgWireResult> { if tuples.is_empty() { return Ok(QueryResponse::new(Arc::new(vec![]), stream::empty())); } let mut results = Vec::with_capacity(tuples.len()); let schema = Arc::new( - tuples[0] - .schema_ref + schema .iter() .map(|column| { let pg_type = into_pg_type(column.datatype())?; diff --git a/src/binder/aggregate.rs b/src/binder/aggregate.rs index 6c51fd44..c8c1764d 100644 --- a/src/binder/aggregate.rs +++ b/src/binder/aggregate.rs @@ -301,7 +301,7 @@ impl<'a, T: Transaction> Binder<'a, T> { return Ok(()); } if matches!(expr, ScalarExpression::Alias { .. }) { - return self.validate_having_orderby(expr.unpack_alias()); + return self.validate_having_orderby(expr.unpack_alias_ref()); } Err(DatabaseError::AggMiss( diff --git a/src/binder/expr.rs b/src/binder/expr.rs index d50354f7..94c7ed23 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -1,17 +1,19 @@ -use crate::catalog::ColumnCatalog; +use crate::catalog::{ColumnCatalog, ColumnRef}; use crate::errors::DatabaseError; use crate::expression; use crate::expression::agg::AggKind; use itertools::Itertools; use sqlparser::ast::{ - BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, UnaryOperator, + BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, Query, + UnaryOperator, }; use std::slice; use std::sync::Arc; -use super::{lower_ident, Binder, QueryBindStep}; +use super::{lower_ident, Binder, QueryBindStep, SubQueryType}; use crate::expression::function::{FunctionSummary, ScalarFunction}; use crate::expression::{AliasType, ScalarExpression}; +use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::value::DataValue; use crate::types::LogicalType; @@ -99,33 +101,40 @@ impl<'a, T: Transaction> Binder<'a, T> { from_expr, }) } - Expr::Subquery(query) => { - let mut sub_query = self.bind_query(query)?; - let sub_query_schema = sub_query.output_schema(); - - if sub_query_schema.len() != 1 { - return Err(DatabaseError::MisMatch( - "expects only one expression to be returned", - "the expression returned by the subquery", - )); - } - let column = sub_query_schema[0].clone(); - self.context.sub_query(sub_query); + Expr::Subquery(subquery) => { + let (sub_query, column) = self.bind_subquery(subquery)?; + self.context.sub_query(SubQueryType::SubQuery(sub_query)); if self.context.is_step(&QueryBindStep::Where) { - let mut alias_column = ColumnCatalog::clone(&column); - alias_column.set_table_name(self.context.temp_table()); - - Ok(ScalarExpression::Alias { - expr: Box::new(ScalarExpression::ColumnRef(column)), - alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new( - alias_column, - )))), - }) + Ok(self.bind_temp_column(column)) } else { Ok(ScalarExpression::ColumnRef(column)) } } + Expr::InSubquery { + expr, + subquery, + negated, + } => { + let (sub_query, column) = self.bind_subquery(subquery)?; + self.context + .sub_query(SubQueryType::InSubQuery(*negated, sub_query)); + + if !self.context.is_step(&QueryBindStep::Where) { + return Err(DatabaseError::UnsupportedStmt( + "`in subquery` can only appear in `Where`".to_string(), + )); + } + + let alias_expr = self.bind_temp_column(column); + + Ok(ScalarExpression::Binary { + op: expression::BinaryOperator::Eq, + left_expr: Box::new(self.bind_expr(expr)?), + right_expr: Box::new(alias_expr), + ty: LogicalType::Boolean, + }) + } Expr::Tuple(exprs) => { let mut bond_exprs = Vec::with_capacity(exprs.len()); @@ -187,6 +196,35 @@ impl<'a, T: Transaction> Binder<'a, T> { } } + fn bind_temp_column(&mut self, column: ColumnRef) -> ScalarExpression { + let mut alias_column = ColumnCatalog::clone(&column); + alias_column.set_table_name(self.context.temp_table()); + + ScalarExpression::Alias { + expr: Box::new(ScalarExpression::ColumnRef(column)), + alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new( + alias_column, + )))), + } + } + + fn bind_subquery( + &mut self, + subquery: &Query, + ) -> Result<(LogicalPlan, Arc), DatabaseError> { + let mut sub_query = self.bind_query(subquery)?; + let sub_query_schema = sub_query.output_schema(); + + if sub_query_schema.len() != 1 { + return Err(DatabaseError::MisMatch( + "expects only one expression to be returned", + "the expression returned by the subquery", + )); + } + let column = sub_query_schema[0].clone(); + Ok((sub_query, column)) + } + pub fn bind_like( &mut self, negated: bool, diff --git a/src/binder/mod.rs b/src/binder/mod.rs index ecf30b51..1b777859 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -46,6 +46,12 @@ pub enum QueryBindStep { Limit, } +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub enum SubQueryType { + SubQuery(LogicalPlan), + InSubQuery(bool, LogicalPlan), +} + #[derive(Clone)] pub struct BinderContext<'a, T: Transaction> { functions: &'a Functions, @@ -60,7 +66,7 @@ pub struct BinderContext<'a, T: Transaction> { pub(crate) agg_calls: Vec, bind_step: QueryBindStep, - sub_queries: HashMap>, + sub_queries: HashMap>, temp_table_id: usize, pub(crate) allow_default: bool, @@ -96,14 +102,18 @@ impl<'a, T: Transaction> BinderContext<'a, T> { &self.bind_step == bind_step } - pub fn sub_query(&mut self, sub_query: LogicalPlan) { + pub fn step_now(&self) -> QueryBindStep { + self.bind_step + } + + pub fn sub_query(&mut self, sub_query: SubQueryType) { self.sub_queries .entry(self.bind_step) .or_default() .push(sub_query) } - pub fn sub_queries_at_now(&mut self) -> Option> { + pub fn sub_queries_at_now(&mut self) -> Option> { self.sub_queries.remove(&self.bind_step) } diff --git a/src/binder/select.rs b/src/binder/select.rs index 035f3d7b..0fa37f48 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -14,7 +14,7 @@ use crate::{ types::value::DataValue, }; -use super::{lower_case_name, lower_ident, Binder, BinderContext, QueryBindStep}; +use super::{lower_case_name, lower_ident, Binder, QueryBindStep, SubQueryType}; use crate::catalog::{ColumnCatalog, ColumnSummary, TableName}; use crate::errors::DatabaseError; @@ -26,7 +26,7 @@ use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::operator::union::UnionOperator; use crate::planner::LogicalPlan; use crate::storage::Transaction; -use crate::types::tuple::{Schema, SchemaRef}; +use crate::types::tuple::Schema; use crate::types::LogicalType; use itertools::Itertools; use sqlparser::ast::{ @@ -37,6 +37,8 @@ use sqlparser::ast::{ impl<'a, T: Transaction> Binder<'a, T> { pub(crate) fn bind_query(&mut self, query: &Query) -> Result { + let origin_step = self.context.step_now(); + if let Some(_with) = &query.with { // TODO support with clause. } @@ -60,6 +62,7 @@ impl<'a, T: Transaction> Binder<'a, T> { plan = self.bind_limit(plan, limit, offset)?; } + self.context.step(origin_step); Ok(plan) } @@ -148,7 +151,7 @@ impl<'a, T: Transaction> Binder<'a, T> { }; let mut left_plan = self.bind_set_expr(left)?; let mut right_plan = self.bind_set_expr(right)?; - let fn_eq = |left_schema: &SchemaRef, right_schema: &SchemaRef| { + let fn_eq = |left_schema: &Schema, right_schema: &Schema| { let left_len = left_schema.len(); if left_len != right_schema.len() { @@ -174,7 +177,6 @@ impl<'a, T: Transaction> Binder<'a, T> { } Ok(UnionOperator::build( left_schema.clone(), - right_schema.clone(), left_plan, right_plan, )) @@ -190,8 +192,7 @@ impl<'a, T: Transaction> Binder<'a, T> { )); } let union_op = Operator::Union(UnionOperator { - left_schema_ref: left_schema.clone(), - right_schema_ref: right_schema.clone(), + schema_ref: left_schema.clone(), }); let distinct_exprs = left_schema .iter() @@ -438,27 +439,17 @@ impl<'a, T: Transaction> Binder<'a, T> { let (join_type, joint_condition) = match join_operator { JoinOperator::Inner(constraint) => (JoinType::Inner, Some(constraint)), - JoinOperator::LeftOuter(constraint) => (JoinType::Left, Some(constraint)), - JoinOperator::RightOuter(constraint) => (JoinType::Right, Some(constraint)), + JoinOperator::LeftOuter(constraint) => (JoinType::LeftOuter, Some(constraint)), + JoinOperator::RightOuter(constraint) => (JoinType::RightOuter, Some(constraint)), JoinOperator::FullOuter(constraint) => (JoinType::Full, Some(constraint)), JoinOperator::CrossJoin => (JoinType::Cross, None), _ => unimplemented!(), }; let (right_table, right) = self.bind_single_table_ref(relation, Some(join_type))?; let right_table = Self::unpack_name(right_table, false); - let fn_table = |context: &BinderContext<_>, table| { - context - .table(table) - .map(|table| table.schema_ref()) - .cloned() - .ok_or(DatabaseError::TableNotFound) - }; - - let left_table = fn_table(&self.context, left_table.clone())?; - let right_table = fn_table(&self.context, right_table.clone())?; let on = match joint_condition { - Some(constraint) => self.bind_join_constraint(&left_table, &right_table, constraint)?, + Some(constraint) => self.bind_join_constraint(left_table, right_table, constraint)?, None => JoinCondition::None, }; @@ -475,16 +466,28 @@ impl<'a, T: Transaction> Binder<'a, T> { let predicate = self.bind_expr(predicate)?; if let Some(sub_queries) = self.context.sub_queries_at_now() { - for mut sub_query in sub_queries { + for sub_query in sub_queries { let mut on_keys: Vec<(ScalarExpression, ScalarExpression)> = vec![]; let mut filter = vec![]; + let (mut plan, join_ty) = match sub_query { + SubQueryType::SubQuery(plan) => (plan, JoinType::Inner), + SubQueryType::InSubQuery(is_not, plan) => { + let join_ty = if is_not { + JoinType::LeftAnti + } else { + JoinType::LeftSemi + }; + (plan, join_ty) + } + }; + Self::extract_join_keys( predicate.clone(), &mut on_keys, &mut filter, children.output_schema(), - sub_query.output_schema(), + plan.output_schema(), )?; // combine multiple filter exprs into one BinaryExpr @@ -499,12 +502,12 @@ impl<'a, T: Transaction> Binder<'a, T> { children = LJoinOperator::build( children, - sub_query, + plan, JoinCondition::On { on: on_keys, filter: join_filter, }, - JoinType::Inner, + join_ty, ); } return Ok(children); @@ -636,8 +639,8 @@ impl<'a, T: Transaction> Binder<'a, T> { fn bind_join_constraint( &mut self, - left_schema: &Schema, - right_schema: &Schema, + left_table: TableName, + right_table: TableName, constraint: &JoinConstraint, ) -> Result { match constraint { @@ -648,12 +651,21 @@ impl<'a, T: Transaction> Binder<'a, T> { let mut filter = vec![]; let expr = self.bind_expr(expr)?; + let left_table = self + .context + .table(left_table) + .ok_or(DatabaseError::TableNotFound)?; + let right_table = self + .context + .table(right_table) + .ok_or(DatabaseError::TableNotFound)?; + Self::extract_join_keys( expr, &mut on_keys, &mut filter, - left_schema, - right_schema, + left_table.schema_ref(), + right_table.schema_ref(), )?; // combine multiple filter exprs into one BinaryExpr @@ -679,11 +691,19 @@ impl<'a, T: Transaction> Binder<'a, T> { .find(|column| column.name() == lower_ident(ident)) .map(|column| ScalarExpression::ColumnRef(column.clone())) }; + let left_table = self + .context + .table(left_table) + .ok_or(DatabaseError::TableNotFound)?; + let right_table = self + .context + .table(right_table) + .ok_or(DatabaseError::TableNotFound)?; for ident in idents { if let (Some(left_column), Some(right_column)) = ( - fn_column(left_schema, ident), - fn_column(right_schema, ident), + fn_column(left_table.schema_ref(), ident), + fn_column(right_table.schema_ref(), ident), ) { on_keys.push((left_column, right_column)); } else { @@ -726,7 +746,7 @@ impl<'a, T: Transaction> Binder<'a, T> { fn_contains(left_schema, summary) || fn_contains(right_schema, summary) }; - match expr { + match expr.unpack_alias() { ScalarExpression::Binary { left_expr, right_expr, @@ -735,7 +755,7 @@ impl<'a, T: Transaction> Binder<'a, T> { } => { match op { BinaryOperator::Eq => { - match (left_expr.as_ref(), right_expr.as_ref()) { + match (left_expr.unpack_alias_ref(), right_expr.unpack_alias_ref()) { // example: foo = bar (ScalarExpression::ColumnRef(l), ScalarExpression::ColumnRef(r)) => { // reorder left and right joins keys to pattern: (left, right) @@ -819,7 +839,7 @@ impl<'a, T: Transaction> Binder<'a, T> { } } } - _ => { + expr => { if expr .referenced_columns(true) .iter() diff --git a/src/binder/update.rs b/src/binder/update.rs index 3172878b..7a0b2fe6 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -28,7 +28,7 @@ impl<'a, T: Transaction> Binder<'a, T> { plan = self.bind_where(plan, predicate)?; } - let mut columns = Vec::with_capacity(assignments.len()); + let mut schema = Vec::with_capacity(assignments.len()); let mut row = Vec::with_capacity(assignments.len()); for Assignment { id, value } in assignments { @@ -61,14 +61,14 @@ impl<'a, T: Transaction> Binder<'a, T> { } _ => return Err(DatabaseError::UnsupportedStmt(value.to_string())), } - columns.push(column); + schema.push(column); } _ => return Err(DatabaseError::InvalidColumn(ident.to_string())), } } } self.context.allow_default = false; - let values_plan = self.bind_values(vec![row], Arc::new(columns)); + let values_plan = self.bind_values(vec![row], Arc::new(schema)); Ok(LogicalPlan::new( Operator::Update(UpdateOperator { table_name }), diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 293f3792..ead2e16b 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -115,18 +115,20 @@ impl TableCatalog { column_ids: Vec, is_unique: bool, is_primary: bool, - ) -> &IndexMeta { + ) -> Result<&IndexMeta, DatabaseError> { let index_id = self.indexes.last().map(|index| index.id + 1).unwrap_or(0); + let pk_ty = *self.primary_key()?.1.datatype(); let index = IndexMeta { id: index_id, column_ids, table_name: self.name.clone(), + pk_ty, name, is_unique, is_primary, }; self.indexes.push(Arc::new(index)); - self.indexes.last().unwrap() + Ok(self.indexes.last().unwrap()) } pub(crate) fn new( diff --git a/src/db.rs b/src/db.rs index 76e6f030..02a3dbc2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,7 +14,7 @@ use crate::parser::parse_sql; use crate::planner::LogicalPlan; use crate::storage::kip::KipStorage; use crate::storage::{Storage, Transaction}; -use crate::types::tuple::Tuple; +use crate::types::tuple::{SchemaRef, Tuple}; pub(crate) type Functions = HashMap>; @@ -64,7 +64,7 @@ impl Database { &self, sql: S, query_execute: QueryExecute, - ) -> Result, DatabaseError> { + ) -> Result<(SchemaRef, Vec), DatabaseError> { match query_execute { QueryExecute::Volcano => self.run(sql).await, QueryExecute::Codegen => { @@ -98,7 +98,10 @@ impl Database { impl Database { /// Run SQL queries. - pub async fn run>(&self, sql: T) -> Result, DatabaseError> { + pub async fn run>( + &self, + sql: T, + ) -> Result<(SchemaRef, Vec), DatabaseError> { let transaction = self.storage.transaction().await?; let plan = Self::build_plan::(sql, &transaction, &self.functions)?; @@ -107,15 +110,16 @@ impl Database { pub(crate) async fn run_volcano( mut transaction: ::TransactionType, - plan: LogicalPlan, - ) -> Result, DatabaseError> { + mut plan: LogicalPlan, + ) -> Result<(SchemaRef, Vec), DatabaseError> { + let schema = plan.output_schema().clone(); let mut stream = build_write(plan, &mut transaction); let tuples = try_collect(&mut stream).await?; drop(stream); transaction.commit().await?; - Ok(tuples) + Ok((schema, tuples)) } pub async fn new_transaction(&self) -> Result, DatabaseError> { @@ -238,12 +242,16 @@ pub struct DBTransaction { } impl DBTransaction { - pub async fn run>(&mut self, sql: T) -> Result, DatabaseError> { - let plan = + pub async fn run>( + &mut self, + sql: T, + ) -> Result<(SchemaRef, Vec), DatabaseError> { + let mut plan = Database::::build_plan::(sql, &self.inner, &self.functions)?; + let schema = plan.output_schema().clone(); let mut stream = build_write(plan, &mut self.inner); - try_collect(&mut stream).await + Ok((schema, try_collect(&mut stream).await?)) } pub async fn commit(self) -> Result<(), DatabaseError> { @@ -255,8 +263,8 @@ impl DBTransaction { #[cfg(test)] mod test { - use crate::catalog::{ColumnCatalog, ColumnDesc}; - use crate::db::{DataBaseBuilder, DatabaseError, QueryExecute}; + use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; + use crate::db::{DataBaseBuilder, DatabaseError}; use crate::expression::function::{FuncMonotonicity, FunctionSummary, ScalarFunctionImpl}; use crate::expression::ScalarExpression; use crate::expression::{BinaryOperator, UnaryOperator}; @@ -318,8 +326,8 @@ mod test { let _ = fnck_sql .run("INSERT INTO test VALUES (1, 2, 2), (0, 1, 1), (2, 1, 1), (3, 3, 3);") .await?; - let tuples = fnck_sql.run("select test(c1, 1) from test").await?; - println!("{}", create_table(&tuples)); + let (schema, tuples) = fnck_sql.run("select test(c1, 1) from test").await?; + println!("{}", create_table(&schema, &tuples)); Ok(()) } @@ -345,8 +353,8 @@ mod test { let _ = tx_2.run("insert into t1 values(2, 2)").await?; let _ = tx_2.run("insert into t1 values(3, 3)").await?; - let tuples_1 = tx_1.run("select * from t1").await?; - let tuples_2 = tx_2.run("select * from t1").await?; + let (_, tuples_1) = tx_1.run("select * from t1").await?; + let (_, tuples_2) = tx_2.run("select * from t1").await?; assert_eq!(tuples_1.len(), 2); assert_eq!(tuples_2.len(), 2); @@ -387,337 +395,4 @@ mod test { Ok(()) } - - #[tokio::test] - async fn test_crud_sql() -> Result<(), DatabaseError> { - #[cfg(not(feature = "codegen_execute"))] - { - let _ = crate::db::test::_test_crud_sql(QueryExecute::Volcano).await?; - } - #[cfg(feature = "codegen_execute")] - { - let mut results_1 = _test_crud_sql(QueryExecute::Volcano).await?; - let mut results_2 = _test_crud_sql(QueryExecute::Codegen).await?; - - assert_eq!(results_1.len(), results_2.len()); - - for i in 0..results_1.len() { - results_1[i].sort_by_key(|tuple: &Tuple| tuple.serialize_to()); - results_2[i].sort_by_key(|tuple: &Tuple| tuple.serialize_to()); - - if results_1[i] != results_2[i] { - panic!( - "Index: {i} Tuples not match! \n Volcano: \n{}\n Codegen: \n{}", - create_table(&results_1[i]), - create_table(&results_2[i]) - ); - } - } - } - - Ok(()) - } - - async fn _test_crud_sql(query_execute: QueryExecute) -> Result>, DatabaseError> { - let mut results = Vec::new(); - let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; - - let _ = fnck_sql.run_on_query("create table t1 (a int primary key, b int unique null, k int, z varchar unique null)", query_execute).await?; - let _ = fnck_sql - .run_on_query( - "create table t2 (c int primary key, d int unsigned null, e datetime)", - query_execute, - ) - .await?; - let _ = fnck_sql.run_on_query("insert into t1 (a, b, k, z) values (-99, 1, 1, 'k'), (-1, 2, 2, 'i'), (5, 3, 2, 'p'), (29, 4, 2, 'db')", query_execute).await?; - let _ = fnck_sql.run_on_query("insert into t2 (d, c, e) values (2, 1, '2021-05-20 21:00:00'), (3, 4, '2023-09-10 00:00:00')", query_execute).await?; - let _ = fnck_sql - .run_on_query( - "create table t3 (a int primary key, b decimal(4,2))", - query_execute, - ) - .await?; - let _ = fnck_sql - .run_on_query( - "insert into t3 (a, b) values (1, 99), (2, 2.01), (3, 3.00)", - query_execute, - ) - .await?; - let _ = fnck_sql - .run_on_query( - "insert into t3 (a, b) values (4, 44), (5, 52), (6, 1.00)", - query_execute, - ) - .await?; - - println!("show tables:"); - let tuples_show_tables = fnck_sql.run_on_query("show tables", query_execute).await?; - println!("{}", create_table(&tuples_show_tables)); - results.push(tuples_show_tables); - - println!("full t1:"); - let tuples_full_fields_t1 = fnck_sql - .run_on_query("select * from t1", query_execute) - .await?; - println!("{}", create_table(&tuples_full_fields_t1)); - results.push(tuples_full_fields_t1); - - println!("full t2:"); - let tuples_full_fields_t2 = fnck_sql - .run_on_query("select * from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_full_fields_t2)); - results.push(tuples_full_fields_t2); - - println!("projection_and_filter:"); - let tuples_projection_and_filter = fnck_sql - .run_on_query("select a from t1 where b > 1", query_execute) - .await?; - println!("{}", create_table(&tuples_projection_and_filter)); - results.push(tuples_projection_and_filter); - - println!("projection_and_sort:"); - let tuples_projection_and_sort = fnck_sql - .run_on_query("select * from t1 order by a, b", query_execute) - .await?; - println!("{}", create_table(&tuples_projection_and_sort)); - results.push(tuples_projection_and_sort); - - println!("like t1 1:"); - let tuples_like_1_t1 = fnck_sql - .run_on_query("select * from t1 where z like '%k'", query_execute) - .await?; - println!("{}", create_table(&tuples_like_1_t1)); - results.push(tuples_like_1_t1); - - println!("like t1 2:"); - let tuples_like_2_t1 = fnck_sql - .run_on_query("select * from t1 where z like '_b'", query_execute) - .await?; - println!("{}", create_table(&tuples_like_2_t1)); - results.push(tuples_like_2_t1); - - println!("not like t1:"); - let tuples_not_like_t1 = fnck_sql - .run_on_query("select * from t1 where z not like '_b'", query_execute) - .await?; - println!("{}", create_table(&tuples_not_like_t1)); - results.push(tuples_not_like_t1); - - println!("in t1:"); - let tuples_in_t1 = fnck_sql - .run_on_query("select * from t1 where a in (5, 29)", query_execute) - .await?; - println!("{}", create_table(&tuples_in_t1)); - results.push(tuples_in_t1); - - println!("not in t1:"); - let tuples_not_in_t1 = fnck_sql - .run_on_query("select * from t1 where a not in (5, 29)", query_execute) - .await?; - println!("{}", create_table(&tuples_not_in_t1)); - results.push(tuples_not_in_t1); - - println!("limit:"); - let tuples_limit = fnck_sql - .run_on_query("select * from t1 limit 1 offset 1", query_execute) - .await?; - println!("{}", create_table(&tuples_limit)); - results.push(tuples_limit); - - println!("inner join:"); - let tuples_inner_join = fnck_sql - .run_on_query("select * from t1 inner join t2 on a = c", query_execute) - .await?; - println!("{}", create_table(&tuples_inner_join)); - results.push(tuples_inner_join); - - println!("left join:"); - let tuples_left_join = fnck_sql - .run_on_query("select * from t1 left join t2 on a = c", query_execute) - .await?; - println!("{}", create_table(&tuples_left_join)); - results.push(tuples_left_join); - - println!("right join:"); - let tuples_right_join = fnck_sql - .run_on_query("select * from t1 right join t2 on a = c", query_execute) - .await?; - println!("{}", create_table(&tuples_right_join)); - results.push(tuples_right_join); - - println!("full join:"); - let tuples_full_join = fnck_sql - .run_on_query("select * from t1 full join t2 on b = c", query_execute) - .await?; - println!("{}", create_table(&tuples_full_join)); - results.push(tuples_full_join); - - println!("count agg:"); - let tuples_count_agg = fnck_sql - .run_on_query("select count(d) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_count_agg)); - results.push(tuples_count_agg); - - println!("count wildcard agg:"); - let tuples_count_wildcard_agg = fnck_sql - .run_on_query("select count(*) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_count_wildcard_agg)); - results.push(tuples_count_wildcard_agg); - - println!("count distinct agg:"); - let tuples_count_distinct_agg = fnck_sql - .run_on_query("select count(distinct d) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_count_distinct_agg)); - results.push(tuples_count_distinct_agg); - - println!("sum agg:"); - let tuples_sum_agg = fnck_sql - .run_on_query("select sum(d) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_sum_agg)); - results.push(tuples_sum_agg); - - println!("sum distinct agg:"); - let tuples_sum_distinct_agg = fnck_sql - .run_on_query("select sum(distinct d) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_sum_distinct_agg)); - results.push(tuples_sum_distinct_agg); - - println!("avg agg:"); - let tuples_avg_agg = fnck_sql - .run_on_query("select avg(d) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_avg_agg)); - results.push(tuples_avg_agg); - - println!("min_max agg:"); - let tuples_min_max_agg = fnck_sql - .run_on_query("select min(d), max(d) from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_min_max_agg)); - results.push(tuples_min_max_agg); - - println!("group agg:"); - let tuples_group_agg = fnck_sql - .run_on_query( - "select c, max(d) from t2 group by c having c = 1", - query_execute, - ) - .await?; - println!("{}", create_table(&tuples_group_agg)); - - println!("alias:"); - let tuples_group_agg = fnck_sql - .run_on_query("select c as o from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_group_agg)); - results.push(tuples_group_agg); - - println!("alias agg:"); - let tuples_group_agg = fnck_sql - .run_on_query( - "select c, max(d) as max_d from t2 group by c having c = 1", - query_execute, - ) - .await?; - println!("{}", create_table(&tuples_group_agg)); - results.push(tuples_group_agg); - - println!("time max:"); - let tuples_time_max = fnck_sql - .run_on_query("select max(e) as max_time from t2", query_execute) - .await?; - println!("{}", create_table(&tuples_time_max)); - results.push(tuples_time_max); - - println!("time where:"); - let tuples_time_where_t2 = fnck_sql - .run_on_query( - "select (c + 1) from t2 where e > '2021-05-20'", - query_execute, - ) - .await?; - println!("{}", create_table(&tuples_time_where_t2)); - results.push(tuples_time_where_t2); - - assert!(fnck_sql - .run_on_query("select max(d) from t2 group by c", query_execute) - .await - .is_err()); - - println!("distinct t1:"); - let tuples_distinct_t1 = fnck_sql - .run_on_query("select distinct b, k from t1", query_execute) - .await?; - println!("{}", create_table(&tuples_distinct_t1)); - results.push(tuples_distinct_t1); - - println!("update t1 with filter:"); - let _ = fnck_sql - .run_on_query("update t1 set b = 0 where b = 1", query_execute) - .await?; - println!("after t1:"); - - let update_after_full_t1 = fnck_sql - .run_on_query("select * from t1", query_execute) - .await?; - println!("{}", create_table(&update_after_full_t1)); - results.push(update_after_full_t1); - - println!("insert overwrite t1:"); - let _ = fnck_sql - .run_on_query( - "insert overwrite t1 (a, b, k) values (-99, 1, 0)", - query_execute, - ) - .await?; - println!("after t1:"); - let insert_overwrite_after_full_t1 = fnck_sql - .run_on_query("select * from t1", query_execute) - .await?; - println!("{}", create_table(&insert_overwrite_after_full_t1)); - results.push(insert_overwrite_after_full_t1); - - assert!(fnck_sql - .run_on_query( - "insert overwrite t1 (a, b, k) values (-1, 1, 0)", - query_execute - ) - .await - .is_err()); - - println!("delete t1 with filter:"); - let _ = fnck_sql - .run_on_query("delete from t1 where b = 0", query_execute) - .await?; - println!("after t1:"); - let delete_after_full_t1 = fnck_sql - .run_on_query("select * from t1", query_execute) - .await?; - println!("{}", create_table(&delete_after_full_t1)); - results.push(delete_after_full_t1); - - println!("trun_on_querycate t1:"); - let _ = fnck_sql.run_on_query("truncate t1", query_execute).await?; - - println!("drop t1:"); - let _ = fnck_sql - .run_on_query("drop table t1", query_execute) - .await?; - - println!("decimal:"); - let tuples_decimal = fnck_sql - .run_on_query("select * from t3", query_execute) - .await?; - println!("{}", create_table(&tuples_decimal)); - results.push(tuples_decimal); - - Ok(results) - } } diff --git a/src/execution/volcano/ddl/add_column.rs b/src/execution/volcano/ddl/add_column.rs index 342fc6a9..a2fa63f4 100644 --- a/src/execution/volcano/ddl/add_column.rs +++ b/src/execution/volcano/ddl/add_column.rs @@ -36,19 +36,12 @@ impl AddColumn { if_not_exists, } = &self.op; let mut unique_values = column.desc().is_unique.then(Vec::new); - let mut tuple_columns = None; let mut tuples = Vec::new(); #[for_await] for tuple in build_read(self.input, transaction) { let mut tuple: Tuple = tuple?; - let tuples_columns = tuple_columns.get_or_insert_with(|| { - let mut columns = Vec::clone(&tuple.schema_ref); - - columns.push(Arc::new(column.clone())); - Arc::new(columns) - }); if let Some(value) = column.default_value() { if let Some(unique_values) = &mut unique_values { unique_values.push((tuple.id.clone().unwrap(), value.clone())); @@ -57,7 +50,6 @@ impl AddColumn { } else { tuple.values.push(Arc::new(DataValue::Null)); } - tuple.schema_ref = tuples_columns.clone(); tuples.push(tuple); } for tuple in tuples { @@ -78,10 +70,10 @@ impl AddColumn { id: unique_meta.id, column_values: vec![value], }; - transaction.add_index(table_name, index, vec![tuple_id], true)?; + transaction.add_index(table_name, index, &tuple_id, true)?; } } - yield TupleBuilder::build_result("ALTER TABLE SUCCESS".to_string(), "1".to_string())?; + yield TupleBuilder::build_result("1".to_string()); } } diff --git a/src/execution/volcano/ddl/create_table.rs b/src/execution/volcano/ddl/create_table.rs index 86c178d2..cf49b71a 100644 --- a/src/execution/volcano/ddl/create_table.rs +++ b/src/execution/volcano/ddl/create_table.rs @@ -32,9 +32,6 @@ impl CreateTable { } = self.op; let _ = transaction.create_table(table_name.clone(), columns, if_not_exists)?; - yield TupleBuilder::build_result( - "CREATE TABLE SUCCESS".to_string(), - format!("{}", table_name), - )?; + yield TupleBuilder::build_result(format!("{}", table_name)); } } diff --git a/src/execution/volcano/ddl/drop_column.rs b/src/execution/volcano/ddl/drop_column.rs index 09778674..0fc51f07 100644 --- a/src/execution/volcano/ddl/drop_column.rs +++ b/src/execution/volcano/ddl/drop_column.rs @@ -6,7 +6,6 @@ use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; use futures_async_stream::try_stream; -use std::sync::Arc; pub struct DropColumn { op: DropColumnOperator, @@ -27,55 +26,44 @@ impl WriteExecutor for DropColumn { impl DropColumn { #[try_stream(boxed, ok = Tuple, error = DatabaseError)] - async fn _execute(self, transaction: &mut T) { + async fn _execute(mut self, transaction: &mut T) { let DropColumnOperator { table_name, column_name, if_exists, - } = &self.op; - let mut tuple_columns = None; - let mut tuples = Vec::new(); + } = self.op; + let tuple_columns = self.input.output_schema(); - #[for_await] - for tuple in build_read(self.input, transaction) { - let mut tuple: Tuple = tuple?; + if let Some((column_index, is_primary)) = tuple_columns + .iter() + .enumerate() + .find(|(_, column)| column.name() == column_name) + .map(|(i, column)| (i, column.desc.is_primary)) + { + if is_primary { + Err(DatabaseError::InvalidColumn( + "drop of primary key column is not allowed.".to_owned(), + ))?; + } + let mut tuples = Vec::new(); - if tuple_columns.is_none() { - if let Some((column_index, is_primary)) = tuple - .schema_ref - .iter() - .enumerate() - .find(|(_, column)| column.name() == column_name) - .map(|(i, column)| (i, column.desc.is_primary)) - { - if is_primary { - Err(DatabaseError::InvalidColumn( - "drop of primary key column is not allowed.".to_owned(), - ))?; - } - let mut columns = Vec::clone(&tuple.schema_ref); - let _ = columns.remove(column_index); + #[for_await] + for tuple in build_read(self.input, transaction) { + let mut tuple: Tuple = tuple?; + let _ = tuple.values.remove(column_index); - tuple_columns = Some((column_index, Arc::new(columns))); - } + tuples.push(tuple); } - if tuple_columns.is_none() && *if_exists { - return Ok(()); + for tuple in tuples { + transaction.append(&table_name, tuple, true)?; } - let (column_i, columns) = tuple_columns - .clone() - .ok_or_else(|| DatabaseError::InvalidColumn("not found column".to_string()))?; - - tuple.schema_ref = columns; - let _ = tuple.values.remove(column_i); + transaction.drop_column(&table_name, &column_name)?; - tuples.push(tuple); + yield TupleBuilder::build_result("1".to_string()); + } else if if_exists { + return Ok(()); + } else { + return Err(DatabaseError::NotFound("drop column", column_name)); } - for tuple in tuples { - transaction.append(table_name, tuple, true)?; - } - transaction.drop_column(table_name, column_name, *if_exists)?; - - yield TupleBuilder::build_result("ALTER TABLE SUCCESS".to_string(), "1".to_string())?; } } diff --git a/src/execution/volcano/ddl/drop_table.rs b/src/execution/volcano/ddl/drop_table.rs index ea89a61e..f80d8fa2 100644 --- a/src/execution/volcano/ddl/drop_table.rs +++ b/src/execution/volcano/ddl/drop_table.rs @@ -3,6 +3,7 @@ use crate::execution::volcano::{BoxedExecutor, WriteExecutor}; use crate::planner::operator::drop_table::DropTableOperator; use crate::storage::Transaction; use crate::types::tuple::Tuple; +use crate::types::tuple_builder::TupleBuilder; use futures_async_stream::try_stream; pub struct DropTable { @@ -28,7 +29,8 @@ impl DropTable { table_name, if_exists, } = self.op; - transaction.drop_table(&table_name, if_exists)?; + + yield TupleBuilder::build_result(format!("{}", table_name)); } } diff --git a/src/execution/volcano/ddl/truncate.rs b/src/execution/volcano/ddl/truncate.rs index 4714f37c..4d72eff8 100644 --- a/src/execution/volcano/ddl/truncate.rs +++ b/src/execution/volcano/ddl/truncate.rs @@ -3,6 +3,7 @@ use crate::execution::volcano::{BoxedExecutor, WriteExecutor}; use crate::planner::operator::truncate::TruncateOperator; use crate::storage::Transaction; use crate::types::tuple::Tuple; +use crate::types::tuple_builder::TupleBuilder; use futures_async_stream::try_stream; pub struct Truncate { @@ -27,5 +28,7 @@ impl Truncate { let TruncateOperator { table_name } = self.op; transaction.drop_data(&table_name)?; + + yield TupleBuilder::build_result(format!("{}", table_name)); } } diff --git a/src/execution/volcano/dml/analyze.rs b/src/execution/volcano/dml/analyze.rs index b5bf6031..ceb56ecb 100644 --- a/src/execution/volcano/dml/analyze.rs +++ b/src/execution/volcano/dml/analyze.rs @@ -1,4 +1,4 @@ -use crate::catalog::{ColumnCatalog, ColumnRef, TableMeta, TableName}; +use crate::catalog::{ColumnRef, TableMeta, TableName}; use crate::errors::DatabaseError; use crate::execution::volcano::{build_read, BoxedExecutor, WriteExecutor}; use crate::optimizer::core::column_meta::ColumnMeta; @@ -54,10 +54,11 @@ impl Analyze { pub async fn _execute(self, transaction: &mut T) { let Analyze { table_name, - input, + mut input, columns, } = self; + let schema = input.output_schema().clone(); let mut builders = HashMap::with_capacity(columns.len()); for column in &columns { @@ -66,11 +67,9 @@ impl Analyze { #[for_await] for tuple in build_read(input, transaction) { - let Tuple { - schema_ref, values, .. - } = tuple?; + let Tuple { values, .. } = tuple?; - for (i, column) in schema_ref.iter().enumerate() { + for (i, column) in schema.iter().enumerate() { if !column.desc.is_index() { continue; } @@ -103,20 +102,13 @@ impl Analyze { } transaction.save_table_meta(&meta)?; - let columns: Vec = vec![Arc::new(ColumnCatalog::new_dummy( - "COLUMN_META_PATH".to_string(), - ))]; let values = meta .colum_meta_paths .into_iter() .map(|path| Arc::new(DataValue::Utf8(Some(path)))) .collect_vec(); - yield Tuple { - id: None, - schema_ref: Arc::new(columns), - values, - }; + yield Tuple { id: None, values }; } } diff --git a/src/execution/volcano/dml/copy_from_file.rs b/src/execution/volcano/dml/copy_from_file.rs index ad82a693..92e0d4c3 100644 --- a/src/execution/volcano/dml/copy_from_file.rs +++ b/src/execution/volcano/dml/copy_from_file.rs @@ -95,10 +95,7 @@ impl CopyFromFile { } fn return_result(size: usize, tx: Sender) -> Result<(), DatabaseError> { - let tuple = TupleBuilder::build_result( - "COPY FROM SOURCE".to_string(), - format!("import {} rows", size), - )?; + let tuple = TupleBuilder::build_result(format!("import {} rows", size)); tx.blocking_send(tuple) .map_err(|_| DatabaseError::ChannelClose)?; @@ -190,11 +187,7 @@ mod tests { .unwrap()?; assert_eq!( tuple, - TupleBuilder::build_result( - "COPY FROM SOURCE".to_string(), - format!("import {} rows", 2) - ) - .unwrap() + TupleBuilder::build_result(format!("import {} rows", 2)) ); Ok(()) diff --git a/src/execution/volcano/dml/insert.rs b/src/execution/volcano/dml/insert.rs index 8cde6afa..77b8b05b 100644 --- a/src/execution/volcano/dml/insert.rs +++ b/src/execution/volcano/dml/insert.rs @@ -6,7 +6,6 @@ use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::index::Index; use crate::types::tuple::Tuple; -use crate::types::tuple_builder::TupleBuilder; use crate::types::value::DataValue; use futures_async_stream::try_stream; use std::collections::HashMap; @@ -47,35 +46,32 @@ impl Insert { pub async fn _execute(self, transaction: &mut T) { let Insert { table_name, - input, + mut input, is_overwrite, } = self; - let mut primary_key_id = None; let mut unique_values = HashMap::new(); let mut tuple_values = Vec::new(); + let schema = input.output_schema().clone(); - if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() { - let tuple_schema_ref = table_catalog.schema_ref(); + let pk_index = schema + .iter() + .find(|col| col.desc.is_primary) + .map(|col| col.id()) + .ok_or_else(|| DatabaseError::NotNull)?; + if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() { #[for_await] for tuple in build_read(input, transaction) { - let Tuple { - schema_ref, values, .. - } = tuple?; + let Tuple { values, .. } = tuple?; - if primary_key_id.is_none() { - let id = schema_ref - .iter() - .find(|col| col.desc.is_primary) - .map(|col| col.id()) - .ok_or_else(|| DatabaseError::NotNull)?; - primary_key_id = Some(id); - } let mut tuple_map = HashMap::new(); for (i, value) in values.into_iter().enumerate() { - tuple_map.insert(schema_ref[i].id(), value); + tuple_map.insert(schema[i].id(), value); } - let tuple_id = tuple_map.get(&primary_key_id.unwrap()).cloned().unwrap(); + let tuple_id = tuple_map + .get(&pk_index) + .cloned() + .ok_or(DatabaseError::NotNull)?; let mut values = Vec::with_capacity(table_catalog.columns_len()); for col in table_catalog.columns() { @@ -97,7 +93,6 @@ impl Insert { } tuple_values.push((tuple_id, values)); } - let tuple_builder = TupleBuilder::new(tuple_schema_ref); // Unique Index for (col_id, values) in unique_values { @@ -108,14 +103,19 @@ impl Insert { column_values: vec![value], }; - transaction.add_index(&table_name, index, vec![tuple_id], true)?; + transaction.add_index(&table_name, index, &tuple_id, true)?; } } } for (tuple_id, values) in tuple_values { - let tuple = tuple_builder.build(Some(tuple_id), values)?; - - transaction.append(&table_name, tuple, is_overwrite)?; + transaction.append( + &table_name, + Tuple { + id: Some(tuple_id), + values, + }, + is_overwrite, + )?; } } } diff --git a/src/execution/volcano/dml/update.rs b/src/execution/volcano/dml/update.rs index 72677cad..87428ad3 100644 --- a/src/execution/volcano/dml/update.rs +++ b/src/execution/volcano/dml/update.rs @@ -38,9 +38,11 @@ impl Update { pub async fn _execute(self, transaction: &mut T) { let Update { table_name, - input, - values, + mut input, + mut values, } = self; + let values_schema = values.output_schema().clone(); + let input_schema = input.output_schema().clone(); if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() { let mut value_map = HashMap::new(); @@ -49,13 +51,9 @@ impl Update { // only once #[for_await] for tuple in build_read(values, transaction) { - let Tuple { - schema_ref: columns, - values, - .. - } = tuple?; - for i in 0..columns.len() { - value_map.insert(columns[i].id(), values[i].clone()); + let Tuple { values, .. } = tuple?; + for i in 0..values.len() { + value_map.insert(values_schema[i].id(), values[i].clone()); } } #[for_await] @@ -68,7 +66,7 @@ impl Update { for mut tuple in tuples { let mut is_overwrite = true; - for (i, column) in tuple.schema_ref.iter().enumerate() { + for (i, column) in input_schema.iter().enumerate() { if let Some(value) = value_map.get(&column.id()) { if column.desc.is_primary { let old_key = tuple.id.replace(value.clone()).unwrap(); @@ -91,7 +89,7 @@ impl Update { transaction.add_index( &table_name, index, - vec![tuple.id.clone().unwrap()], + tuple.id.as_ref().unwrap(), true, )?; } diff --git a/src/execution/volcano/dql/aggregate/avg.rs b/src/execution/volcano/dql/aggregate/avg.rs index 623e03eb..ac31a15d 100644 --- a/src/execution/volcano/dql/aggregate/avg.rs +++ b/src/execution/volcano/dql/aggregate/avg.rs @@ -38,6 +38,9 @@ impl Accumulator for AvgAccumulator { } else { DataValue::UInt32(Some(self.count as u32)) }; + if self.count == 0 { + return Ok(Arc::new(DataValue::init(&value.logical_type()))); + } Ok(Arc::new(DataValue::binary_op( &value, diff --git a/src/execution/volcano/dql/aggregate/hash_agg.rs b/src/execution/volcano/dql/aggregate/hash_agg.rs index 76d397cd..90a63dc4 100644 --- a/src/execution/volcano/dql/aggregate/hash_agg.rs +++ b/src/execution/volcano/dql/aggregate/hash_agg.rs @@ -6,13 +6,11 @@ use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; use crate::storage::Transaction; -use crate::types::tuple::Tuple; +use crate::types::tuple::{SchemaRef, Tuple}; use crate::types::value::ValueRef; use ahash::HashMap; use futures_async_stream::try_stream; use itertools::Itertools; -use std::mem; -use std::sync::Arc; pub struct HashAggExecutor { agg_calls: Vec, @@ -46,6 +44,8 @@ impl ReadExecutor for HashAggExecutor { } pub(crate) struct HashAggStatus { + schema_ref: SchemaRef, + agg_calls: Vec, groupby_exprs: Vec, @@ -55,10 +55,12 @@ pub(crate) struct HashAggStatus { impl HashAggStatus { pub(crate) fn new( + schema_ref: SchemaRef, agg_calls: Vec, groupby_exprs: Vec, ) -> Self { HashAggStatus { + schema_ref, agg_calls, groupby_exprs, group_columns: vec![], @@ -84,7 +86,7 @@ impl HashAggStatus { .iter() .map(|expr| { if let ScalarExpression::AggCall { args, .. } = expr { - args[0].eval(&tuple) + args[0].eval(&tuple, &self.schema_ref) } else { unreachable!() } @@ -94,7 +96,7 @@ impl HashAggStatus { let group_keys: Vec = self .groupby_exprs .iter() - .map(|expr| expr.eval(&tuple)) + .map(|expr| expr.eval(&tuple, &self.schema_ref)) .try_collect()?; for (acc, value) in self @@ -111,8 +113,6 @@ impl HashAggStatus { } pub(crate) fn as_tuples(&mut self) -> Result, DatabaseError> { - let group_columns = Arc::new(mem::take(&mut self.group_columns)); - self.group_hash_accs .drain() .map(|(group_keys, accs)| { @@ -123,11 +123,7 @@ impl HashAggStatus { .chain(group_keys.into_iter().map(Ok)) .try_collect()?; - Ok::(Tuple { - id: None, - schema_ref: group_columns.clone(), - values, - }) + Ok::(Tuple { id: None, values }) }) .try_collect() } @@ -139,13 +135,14 @@ impl HashAggExecutor { let HashAggExecutor { agg_calls, groupby_exprs, - .. + mut input, } = self; - let mut agg_status = HashAggStatus::new(agg_calls, groupby_exprs); + let mut agg_status = + HashAggStatus::new(input.output_schema().clone(), agg_calls, groupby_exprs); #[for_await] - for tuple in build_read(self.input, transaction) { + for tuple in build_read(input, transaction) { agg_status.update(tuple?)?; } @@ -184,18 +181,18 @@ mod test { let transaction = storage.transaction().await?; let desc = ColumnDesc::new(LogicalType::Integer, false, false, None); - let t1_columns = vec![ + let t1_schema = Arc::new(vec![ Arc::new(ColumnCatalog::new("c1".to_string(), true, desc.clone())), Arc::new(ColumnCatalog::new("c2".to_string(), true, desc.clone())), Arc::new(ColumnCatalog::new("c3".to_string(), true, desc.clone())), - ]; + ]); let operator = AggregateOperator { - groupby_exprs: vec![ScalarExpression::ColumnRef(t1_columns[0].clone())], + groupby_exprs: vec![ScalarExpression::ColumnRef(t1_schema[0].clone())], agg_calls: vec![ScalarExpression::AggCall { distinct: false, kind: AggKind::Sum, - args: vec![ScalarExpression::ColumnRef(t1_columns[1].clone())], + args: vec![ScalarExpression::ColumnRef(t1_schema[1].clone())], ty: LogicalType::Integer, }], is_distinct: false, @@ -225,7 +222,7 @@ mod test { Arc::new(DataValue::Int32(Some(3))), ], ], - schema_ref: Arc::new(t1_columns), + schema_ref: t1_schema.clone(), }), childrens: vec![], physical_option: None, @@ -236,7 +233,13 @@ mod test { try_collect(&mut HashAggExecutor::from((operator, input)).execute(&transaction)) .await?; - println!("hash_agg_test: \n{}", create_table(&tuples)); + println!( + "hash_agg_test: \n{}", + create_table( + &Arc::new(vec![t1_schema[0].clone(), t1_schema[1].clone()]), + &tuples + ) + ); assert_eq!(tuples.len(), 2); diff --git a/src/execution/volcano/dql/aggregate/simple_agg.rs b/src/execution/volcano/dql/aggregate/simple_agg.rs index 795e7871..08092963 100644 --- a/src/execution/volcano/dql/aggregate/simple_agg.rs +++ b/src/execution/volcano/dql/aggregate/simple_agg.rs @@ -9,7 +9,6 @@ use crate::types::tuple::Tuple; use crate::types::value::ValueRef; use futures_async_stream::try_stream; use itertools::Itertools; -use std::sync::Arc; pub struct SimpleAggExecutor { agg_calls: Vec, @@ -33,25 +32,21 @@ impl ReadExecutor for SimpleAggExecutor { impl SimpleAggExecutor { #[try_stream(boxed, ok = Tuple, error = DatabaseError)] pub async fn _execute(self, transaction: &T) { - let mut accs = create_accumulators(&self.agg_calls); - let mut columns_option = None; + let SimpleAggExecutor { + agg_calls, + mut input, + } = self; + let mut accs = create_accumulators(&agg_calls); + let schema = input.output_schema().clone(); #[for_await] - for tuple in build_read(self.input, transaction) { + for tuple in build_read(input, transaction) { let tuple = tuple?; - columns_option.get_or_insert_with(|| { - self.agg_calls - .iter() - .map(|expr| expr.output_column()) - .collect_vec() - }); - - let values: Vec = self - .agg_calls + let values: Vec = agg_calls .iter() .map(|expr| match expr { - ScalarExpression::AggCall { args, .. } => args[0].eval(&tuple), + ScalarExpression::AggCall { args, .. } => args[0].eval(&tuple, &schema), _ => unreachable!(), }) .try_collect()?; @@ -60,15 +55,8 @@ impl SimpleAggExecutor { acc.update_value(value)?; } } + let values: Vec = accs.into_iter().map(|acc| acc.evaluate()).try_collect()?; - if let Some(columns) = columns_option { - let values: Vec = accs.into_iter().map(|acc| acc.evaluate()).try_collect()?; - - yield Tuple { - id: None, - schema_ref: Arc::new(columns), - values, - }; - } + yield Tuple { id: None, values }; } } diff --git a/src/execution/volcano/dql/describe.rs b/src/execution/volcano/dql/describe.rs index 32bdf871..62f5fa1b 100644 --- a/src/execution/volcano/dql/describe.rs +++ b/src/execution/volcano/dql/describe.rs @@ -40,13 +40,6 @@ impl Describe { let table = transaction .table(self.table_name.clone()) .ok_or(DatabaseError::TableNotFound)?; - let columns = Arc::new(vec![ - Arc::new(ColumnCatalog::new_dummy("FIELD".to_string())), - Arc::new(ColumnCatalog::new_dummy("TYPE".to_string())), - Arc::new(ColumnCatalog::new_dummy("NULL".to_string())), - Arc::new(ColumnCatalog::new_dummy("Key".to_string())), - Arc::new(ColumnCatalog::new_dummy("DEFAULT".to_string())), - ]); let key_fn = |column: &ColumnCatalog| { if column.desc.is_primary { PRIMARY_KEY_TYPE.clone() @@ -67,11 +60,7 @@ impl Describe { .default_value() .unwrap_or_else(|| Arc::new(DataValue::none(column.datatype()))), ]; - yield Tuple { - id: None, - schema_ref: columns.clone(), - values, - }; + yield Tuple { id: None, values }; } } } diff --git a/src/execution/volcano/dql/dummy.rs b/src/execution/volcano/dql/dummy.rs index 55670137..c215ff92 100644 --- a/src/execution/volcano/dql/dummy.rs +++ b/src/execution/volcano/dql/dummy.rs @@ -3,7 +3,6 @@ use crate::execution::volcano::{BoxedExecutor, ReadExecutor}; use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; -use std::sync::Arc; pub struct Dummy {} @@ -18,7 +17,6 @@ impl Dummy { pub async fn _execute(self) { yield Tuple { id: None, - schema_ref: Arc::new(vec![]), values: vec![], } } diff --git a/src/execution/volcano/dql/explain.rs b/src/execution/volcano/dql/explain.rs index 4e736783..913a38e6 100644 --- a/src/execution/volcano/dql/explain.rs +++ b/src/execution/volcano/dql/explain.rs @@ -1,4 +1,3 @@ -use crate::catalog::ColumnCatalog; use crate::errors::DatabaseError; use crate::execution::volcano::{BoxedExecutor, ReadExecutor}; use crate::planner::LogicalPlan; @@ -27,13 +26,8 @@ impl ReadExecutor for Explain { impl Explain { #[try_stream(boxed, ok = Tuple, error = DatabaseError)] pub async fn _execute(self) { - let schema_ref = Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))]); let values = vec![Arc::new(DataValue::Utf8(Some(self.plan.explain(0))))]; - yield Tuple { - id: None, - schema_ref, - values, - }; + yield Tuple { id: None, values }; } } diff --git a/src/execution/volcano/dql/filter.rs b/src/execution/volcano/dql/filter.rs index 34eb1bc1..49a12dcc 100644 --- a/src/execution/volcano/dql/filter.rs +++ b/src/execution/volcano/dql/filter.rs @@ -27,13 +27,17 @@ impl ReadExecutor for Filter { impl Filter { #[try_stream(boxed, ok = Tuple, error = DatabaseError)] pub async fn _execute(self, transaction: &T) { - let Filter { predicate, input } = self; + let Filter { + predicate, + mut input, + } = self; + let schema = input.output_schema().clone(); #[for_await] for tuple in build_read(input, transaction) { let tuple = tuple?; - if predicate.eval(&tuple)?.is_true()? { + if predicate.eval(&tuple, &schema)?.is_true()? { yield tuple; } } diff --git a/src/execution/volcano/dql/join/hash_join.rs b/src/execution/volcano/dql/join/hash_join.rs index e4004277..96e28272 100644 --- a/src/execution/volcano/dql/join/hash_join.rs +++ b/src/execution/volcano/dql/join/hash_join.rs @@ -6,13 +6,12 @@ use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; use crate::storage::Transaction; -use crate::types::tuple::{SchemaRef, Tuple}; +use crate::types::tuple::{Schema, SchemaRef, Tuple}; use crate::types::value::{DataValue, ValueRef, NULL_VALUE}; use ahash::HashMap; -use futures::stream::BoxStream; -use futures::{stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; +use kip_db::kernel::utils::bloom_filter::BitVector; use std::sync::Arc; pub struct HashJoin { @@ -48,7 +47,7 @@ impl ReadExecutor for HashJoin { pub(crate) struct HashJoinStatus { ty: JoinType, filter: Option, - build_map: HashMap, (Vec, bool)>, + build_map: HashMap, (Vec, bool, bool)>, full_schema_ref: SchemaRef, left_schema_len: usize, @@ -108,20 +107,24 @@ impl HashJoinStatus { let HashJoinStatus { on_left_keys, build_map, + full_schema_ref, + left_schema_len, .. } = self; - let values = Self::eval_keys(on_left_keys, &tuple)?; + let values = Self::eval_keys(on_left_keys, &tuple, &full_schema_ref[0..*left_schema_len])?; build_map .entry(values) - .or_insert_with(|| (Vec::new(), false)) + .or_insert_with(|| (Vec::new(), false, false)) .0 .push(tuple); Ok(()) } - pub(crate) fn right_probe(&mut self, tuple: Tuple) -> Result, DatabaseError> { + #[try_stream(boxed, ok = Tuple, error = DatabaseError)] + #[allow(unused_assignments)] + pub(crate) async fn right_probe(&mut self, tuple: Tuple) { let HashJoinStatus { on_right_keys, full_schema_ref, @@ -132,15 +135,25 @@ impl HashJoinStatus { .. } = self; - let right_cols_len = tuple.schema_ref.len(); - let values = Self::eval_keys(on_right_keys, &tuple)?; + let right_cols_len = tuple.values.len(); + let values = Self::eval_keys(on_right_keys, &tuple, &full_schema_ref[*left_schema_len..])?; - let mut join_tuples = Vec::with_capacity(1); - if let Some((tuples, is_used)) = build_map.get_mut(&values) { + if let Some((tuples, is_used, is_filtered)) = build_map.get_mut(&values) { + let mut bits_option = None; *is_used = true; - join_tuples.reserve(tuples.len()); - for Tuple { values, .. } in tuples { + match ty { + JoinType::LeftSemi => { + if *is_filtered { + return Ok(()); + } else { + bits_option = Some(BitVector::new(tuples.len())); + } + } + JoinType::LeftAnti => return Ok(()), + _ => (), + } + for (i, Tuple { values, .. }) in tuples.iter().enumerate() { let full_values = values .iter() .cloned() @@ -148,50 +161,60 @@ impl HashJoinStatus { .collect_vec(); let tuple = Tuple { id: None, - schema_ref: full_schema_ref.clone(), values: full_values, }; - if let Some(tuple) = Self::filter(tuple, filter, ty, *left_schema_len)? { - join_tuples.push(tuple); + if let Some(tuple) = + Self::filter(tuple, full_schema_ref, filter, ty, *left_schema_len)? + { + if let Some(bits) = bits_option.as_mut() { + bits.set_bit(i, true); + } else { + yield tuple; + } } } - } else if matches!(ty, JoinType::Right | JoinType::Full) { + if let Some(bits) = bits_option { + let mut cnt = 0; + tuples.retain(|_| { + let res = bits.get_bit(cnt); + cnt += 1; + res + }); + *is_filtered = true + } + } else if matches!(ty, JoinType::RightOuter | JoinType::Full) { let empty_len = full_schema_ref.len() - right_cols_len; let values = (0..empty_len) .map(|_| NULL_VALUE.clone()) .chain(tuple.values) .collect_vec(); - let tuple = Tuple { - id: None, - schema_ref: full_schema_ref.clone(), - values, - }; - if let Some(tuple) = Self::filter(tuple, filter, ty, *left_schema_len)? { - join_tuples.push(tuple); + let tuple = Tuple { id: None, values }; + if let Some(tuple) = Self::filter(tuple, full_schema_ref, filter, ty, *left_schema_len)? + { + yield tuple; } } - - Ok(join_tuples) } pub(crate) fn filter( mut tuple: Tuple, + schema: &Schema, filter: &Option, join_ty: &JoinType, left_schema_len: usize, ) -> Result, DatabaseError> { if let (Some(expr), false) = (filter, matches!(join_ty, JoinType::Full | JoinType::Cross)) { - match expr.eval(&tuple)?.as_ref() { + match expr.eval(&tuple, schema)?.as_ref() { DataValue::Boolean(Some(false) | None) => { - let full_schema_len = tuple.schema_ref.len(); + let full_schema_len = schema.len(); match join_ty { - JoinType::Left => { + JoinType::LeftOuter => { for i in left_schema_len..full_schema_len { tuple.values[i] = NULL_VALUE.clone(); } } - JoinType::Right => { + JoinType::RightOuter => { for i in 0..left_schema_len { tuple.values[i] = NULL_VALUE.clone(); } @@ -207,47 +230,91 @@ impl HashJoinStatus { Ok(Some(tuple)) } - pub(crate) fn build_drop(&mut self) -> Option> { + pub(crate) fn build_drop(&mut self) -> Option { let HashJoinStatus { full_schema_ref, build_map, ty, + filter, + left_schema_len, .. } = self; - matches!(ty, JoinType::Left | JoinType::Full).then(|| { - stream::iter( - build_map - .drain() - .filter_map(|(_, (mut left_tuples, is_used))| { - if !is_used { - for tuple in left_tuples.iter_mut() { - tuple.schema_ref = full_schema_ref.clone(); - - while tuple.values.len() != full_schema_ref.len() { - tuple.values.push(NULL_VALUE.clone()); - } - } - return Some(left_tuples); - } - None - }) - .flatten(), - ) - .boxed() - }) + match ty { + JoinType::LeftOuter | JoinType::Full => { + Some(Self::right_null_tuple(build_map, full_schema_ref)) + } + JoinType::LeftSemi | JoinType::LeftAnti => Some(Self::one_side_tuple( + build_map, + full_schema_ref, + filter, + ty, + *left_schema_len, + )), + _ => None, + } + } + + #[try_stream(boxed, ok = Tuple, error = DatabaseError)] + async fn right_null_tuple<'a>( + build_map: &'a mut HashMap, (Vec, bool, bool)>, + schema: &'a Schema, + ) { + for (_, (left_tuples, is_used, _)) in build_map.drain() { + if is_used { + continue; + } + for mut tuple in left_tuples { + while tuple.values.len() != schema.len() { + tuple.values.push(NULL_VALUE.clone()); + } + yield tuple; + } + } + } + + #[try_stream(boxed, ok = Tuple, error = DatabaseError)] + async fn one_side_tuple<'a>( + build_map: &'a mut HashMap, (Vec, bool, bool)>, + schema: &'a Schema, + filter: &'a Option, + join_ty: &'a JoinType, + left_schema_len: usize, + ) { + let is_left_semi = matches!(join_ty, JoinType::LeftSemi); + + for (_, (left_tuples, mut is_used, is_filtered)) in build_map.drain() { + if is_left_semi { + is_used = !is_used; + } + if is_used { + continue; + } + if is_filtered { + for tuple in left_tuples { + yield tuple; + } + continue; + } + for tuple in left_tuples { + if let Some(tuple) = Self::filter(tuple, schema, filter, join_ty, left_schema_len)? + { + yield tuple; + } + } + } } fn eval_keys( on_keys: &[ScalarExpression], tuple: &Tuple, + schema: &[ColumnRef], ) -> Result, DatabaseError> { let mut values = Vec::with_capacity(on_keys.len()); for expr in on_keys { - values.push(expr.eval(tuple)?); + values.push(expr.eval(tuple, schema)?); } - Ok(values) } } @@ -284,15 +351,16 @@ impl HashJoin { for tuple in build_read(right_input, transaction) { let tuple: Tuple = tuple?; - for tuple in join_status.right_probe(tuple)? { - yield tuple + #[for_await] + for tuple in join_status.right_probe(tuple) { + yield tuple?; } } if let Some(stream) = join_status.build_drop() { #[for_await] for tuple in stream { - yield tuple + yield tuple?; } }; } @@ -312,7 +380,6 @@ mod test { use crate::planner::LogicalPlan; use crate::storage::kip::KipStorage; use crate::storage::Storage; - use crate::types::tuple::create_table; use crate::types::value::DataValue; use crate::types::LogicalType; use std::sync::Arc; @@ -419,8 +486,6 @@ mod test { let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; - println!("inner_test: \n{}", create_table(&tuples)); - assert_eq!(tuples.len(), 3); assert_eq!( @@ -451,31 +516,66 @@ mod test { on: keys, filter: None, }, - join_type: JoinType::Left, + join_type: JoinType::LeftOuter, }; - let mut executor = HashJoin::from((op, left, right)).execute(&transaction); - let tuples = try_collect(&mut executor).await?; - - println!("left_test: \n{}", create_table(&tuples)); - - assert_eq!(tuples.len(), 4); - - assert_eq!( - tuples[0].values, - build_integers(vec![Some(0), Some(2), Some(4), Some(0), Some(2), Some(4)]) - ); - assert_eq!( - tuples[1].values, - build_integers(vec![Some(1), Some(3), Some(5), Some(1), Some(3), Some(5)]) - ); - assert_eq!( - tuples[2].values, - build_integers(vec![Some(1), Some(3), Some(5), Some(1), Some(1), Some(1)]) - ); - assert_eq!( - tuples[3].values, - build_integers(vec![Some(3), Some(5), Some(7), None, None, None]) - ); + //Outer + { + let executor = HashJoin::from((op.clone(), left.clone(), right.clone())); + let tuples = try_collect(&mut executor.execute(&transaction)).await?; + + assert_eq!(tuples.len(), 4); + + assert_eq!( + tuples[0].values, + build_integers(vec![Some(0), Some(2), Some(4), Some(0), Some(2), Some(4)]) + ); + assert_eq!( + tuples[1].values, + build_integers(vec![Some(1), Some(3), Some(5), Some(1), Some(3), Some(5)]) + ); + assert_eq!( + tuples[2].values, + build_integers(vec![Some(1), Some(3), Some(5), Some(1), Some(1), Some(1)]) + ); + assert_eq!( + tuples[3].values, + build_integers(vec![Some(3), Some(5), Some(7), None, None, None]) + ); + } + // Semi + { + let mut executor = HashJoin::from((op.clone(), left.clone(), right.clone())); + executor.ty = JoinType::LeftSemi; + let mut tuples = try_collect(&mut executor.execute(&transaction)).await?; + + assert_eq!(tuples.len(), 2); + tuples.sort_by_key(|tuple| { + let mut bytes = Vec::new(); + tuple.values[0].memcomparable_encode(&mut bytes).unwrap(); + bytes + }); + + assert_eq!( + tuples[0].values, + build_integers(vec![Some(0), Some(2), Some(4)]) + ); + assert_eq!( + tuples[1].values, + build_integers(vec![Some(1), Some(3), Some(5)]) + ); + } + // Anti + { + let mut executor = HashJoin::from((op, left, right)); + executor.ty = JoinType::LeftAnti; + let tuples = try_collect(&mut executor.execute(&transaction)).await?; + + assert_eq!(tuples.len(), 1); + assert_eq!( + tuples[0].values, + build_integers(vec![Some(3), Some(5), Some(7)]) + ); + } Ok(()) } @@ -492,13 +592,11 @@ mod test { on: keys, filter: None, }, - join_type: JoinType::Right, + join_type: JoinType::RightOuter, }; let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; - println!("right_test: \n{}", create_table(&tuples)); - assert_eq!(tuples.len(), 4); assert_eq!( @@ -538,8 +636,6 @@ mod test { let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; - println!("full_test: \n{}", create_table(&tuples)); - assert_eq!(tuples.len(), 5); assert_eq!( diff --git a/src/execution/volcano/dql/join/mod.rs b/src/execution/volcano/dql/join/mod.rs index 12036197..33781bd0 100644 --- a/src/execution/volcano/dql/join/mod.rs +++ b/src/execution/volcano/dql/join/mod.rs @@ -5,8 +5,8 @@ pub(crate) mod hash_join; pub fn joins_nullable(join_type: &JoinType) -> (bool, bool) { match join_type { JoinType::Inner => (false, false), - JoinType::Left => (false, true), - JoinType::Right => (true, false), + JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => (false, true), + JoinType::RightOuter => (true, false), JoinType::Full => (true, true), JoinType::Cross => (true, true), } diff --git a/src/execution/volcano/dql/projection.rs b/src/execution/volcano/dql/projection.rs index 5fceb24a..06a858e1 100644 --- a/src/execution/volcano/dql/projection.rs +++ b/src/execution/volcano/dql/projection.rs @@ -6,7 +6,6 @@ use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; -use std::sync::Arc; pub struct Projection { exprs: Vec, @@ -28,26 +27,17 @@ impl ReadExecutor for Projection { impl Projection { #[try_stream(boxed, ok = Tuple, error = DatabaseError)] pub async fn _execute(self, transaction: &T) { - let Projection { exprs, input } = self; - let mut schema_ref = None; + let Projection { exprs, mut input } = self; + let schema = input.output_schema().clone(); #[for_await] for tuple in build_read(input, transaction) { let mut tuple = tuple?; let mut values = Vec::with_capacity(exprs.len()); - let schema_ref = schema_ref.get_or_insert_with(|| { - let mut columns = Vec::with_capacity(exprs.len()); - - for expr in exprs.iter() { - columns.push(expr.output_column()); - } - Arc::new(columns) - }); for expr in exprs.iter() { - values.push(expr.eval(&tuple)?); + values.push(expr.eval(&tuple, &schema)?); } - tuple.schema_ref = schema_ref.clone(); tuple.values = values; yield tuple; diff --git a/src/execution/volcano/dql/show_table.rs b/src/execution/volcano/dql/show_table.rs index afd18ce8..49ff0b6a 100644 --- a/src/execution/volcano/dql/show_table.rs +++ b/src/execution/volcano/dql/show_table.rs @@ -1,4 +1,4 @@ -use crate::catalog::{ColumnCatalog, TableMeta}; +use crate::catalog::TableMeta; use crate::errors::DatabaseError; use crate::execution::volcano::{BoxedExecutor, ReadExecutor}; use crate::storage::Transaction; @@ -25,20 +25,12 @@ impl ShowTables { colum_meta_paths: histogram_paths, } in metas { - let schema_ref = Arc::new(vec![ - Arc::new(ColumnCatalog::new_dummy("TABLE".to_string())), - Arc::new(ColumnCatalog::new_dummy("COLUMN_METAS_LEN".to_string())), - ]); let values = vec![ Arc::new(DataValue::Utf8(Some(table_name.to_string()))), Arc::new(DataValue::UInt32(Some(histogram_paths.len() as u32))), ]; - yield Tuple { - id: None, - schema_ref, - values, - }; + yield Tuple { id: None, values }; } } } diff --git a/src/execution/volcano/dql/sort.rs b/src/execution/volcano/dql/sort.rs index 0957fe0a..b0097449 100644 --- a/src/execution/volcano/dql/sort.rs +++ b/src/execution/volcano/dql/sort.rs @@ -3,7 +3,7 @@ use crate::execution::volcano::{build_read, BoxedExecutor, ReadExecutor}; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::LogicalPlan; use crate::storage::Transaction; -use crate::types::tuple::Tuple; +use crate::types::tuple::{Schema, Tuple}; use futures_async_stream::try_stream; use itertools::Itertools; use std::mem; @@ -34,6 +34,7 @@ pub(crate) fn radix_sort(mut tuples: Vec<(T, Vec)>) -> Vec { } pub(crate) fn sort( + schema: &Schema, sort_fields: &[SortField], tuples: Vec, ) -> Result, DatabaseError> { @@ -50,7 +51,7 @@ pub(crate) fn sort( { let mut key = Vec::new(); - expr.eval(&tuple)?.memcomparable_encode(&mut key)?; + expr.eval(&tuple, schema)?.memcomparable_encode(&mut key)?; key.push(if *nulls_first { u8::MIN } else { u8::MAX }); if !asc { @@ -95,15 +96,16 @@ impl Sort { let Sort { sort_fields, limit, - input, + mut input, } = self; + let schema = input.output_schema().clone(); let mut tuples: Vec = vec![]; #[for_await] for tuple in build_read(input, transaction) { tuples.push(tuple?); } - let mut tuples = sort(&sort_fields, tuples)?; + let mut tuples = sort(&schema, &sort_fields, tuples)?; let len = limit.unwrap_or(tuples.len()); for tuple in tuples.drain(..len) { diff --git a/src/execution/volcano/dql/values.rs b/src/execution/volcano/dql/values.rs index 34c8a7fe..d276cc01 100644 --- a/src/execution/volcano/dql/values.rs +++ b/src/execution/volcano/dql/values.rs @@ -24,14 +24,10 @@ impl ReadExecutor for Values { impl Values { #[try_stream(boxed, ok = Tuple, error = DatabaseError)] pub async fn _execute(self) { - let ValuesOperator { schema_ref, rows } = self.op; + let ValuesOperator { rows, .. } = self.op; for values in rows { - yield Tuple { - id: None, - schema_ref: schema_ref.clone(), - values, - }; + yield Tuple { id: None, values }; } } } diff --git a/src/execution/volcano/mod.rs b/src/execution/volcano/mod.rs index 35263f5a..3169b99b 100644 --- a/src/execution/volcano/mod.rs +++ b/src/execution/volcano/mod.rs @@ -125,7 +125,7 @@ pub fn build_write(plan: LogicalPlan, transaction: &mut T) -> Bo operator, mut childrens, physical_option, - _output_schema_ref: _out_schema_ref, + _output_schema_ref, } = plan; match operator { @@ -171,7 +171,7 @@ pub fn build_write(plan: LogicalPlan, transaction: &mut T) -> Bo operator, childrens, physical_option, - _output_schema_ref: _out_schema_ref, + _output_schema_ref, }, transaction, ), diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index e60a699a..f00239d2 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -1,3 +1,4 @@ +use crate::catalog::ColumnRef; use crate::errors::DatabaseError; use crate::expression::function::ScalarFunction; use crate::expression::{AliasType, BinaryOperator, ScalarExpression}; @@ -14,8 +15,8 @@ lazy_static! { } macro_rules! eval_to_num { - ($num_expr:expr, $tuple:expr) => { - if let Some(num_i32) = DataValue::clone($num_expr.eval($tuple)?.as_ref()) + ($num_expr:expr, $tuple:expr, $schema:expr) => { + if let Some(num_i32) = DataValue::clone($num_expr.eval($tuple, $schema)?.as_ref()) .cast(&LogicalType::Integer)? .i32() { @@ -27,7 +28,7 @@ macro_rules! eval_to_num { } impl ScalarExpression { - pub fn eval(&self, tuple: &Tuple) -> Result { + pub fn eval(&self, tuple: &Tuple, schema: &[ColumnRef]) -> Result { let check_cast = |value: ValueRef, return_type: &LogicalType| { if value.logical_type() != *return_type { return Ok(Arc::new(DataValue::clone(&value).cast(return_type)?)); @@ -38,8 +39,7 @@ impl ScalarExpression { match self { ScalarExpression::Constant(val) => Ok(val.clone()), ScalarExpression::ColumnRef(col) => { - let value = tuple - .schema_ref + let value = schema .iter() .find_position(|tul_col| tul_col.summary() == col.summary()) .map(|(i, _)| &tuple.values[i]) @@ -49,8 +49,7 @@ impl ScalarExpression { Ok(value) } ScalarExpression::Alias { expr, alias } => { - if let Some(value) = tuple - .schema_ref + if let Some(value) = schema .iter() .find_position(|tul_col| match alias { AliasType::Name(alias) => { @@ -65,10 +64,10 @@ impl ScalarExpression { return Ok(value.clone()); } - expr.eval(tuple) + expr.eval(tuple, schema) } ScalarExpression::TypeCast { expr, ty, .. } => { - let value = expr.eval(tuple)?; + let value = expr.eval(tuple, schema)?; Ok(Arc::new(DataValue::clone(&value).cast(ty)?)) } @@ -78,13 +77,13 @@ impl ScalarExpression { op, .. } => { - let left = left_expr.eval(tuple)?; - let right = right_expr.eval(tuple)?; + let left = left_expr.eval(tuple, schema)?; + let right = right_expr.eval(tuple, schema)?; Ok(Arc::new(DataValue::binary_op(&left, &right, op)?)) } ScalarExpression::IsNull { expr, negated } => { - let mut is_null = expr.eval(tuple)?.is_null(); + let mut is_null = expr.eval(tuple, schema)?.is_null(); if *negated { is_null = !is_null; } @@ -95,13 +94,13 @@ impl ScalarExpression { args, negated, } => { - let value = expr.eval(tuple)?; + let value = expr.eval(tuple, schema)?; if value.is_null() { return Ok(Arc::new(DataValue::Boolean(None))); } let mut is_in = false; for arg in args { - let arg_value = arg.eval(tuple)?; + let arg_value = arg.eval(tuple, schema)?; if arg_value.is_null() { return Ok(Arc::new(DataValue::Boolean(None))); @@ -117,7 +116,7 @@ impl ScalarExpression { Ok(Arc::new(DataValue::Boolean(Some(is_in)))) } ScalarExpression::Unary { expr, op, .. } => { - let value = expr.eval(tuple)?; + let value = expr.eval(tuple, schema)?; Ok(Arc::new(DataValue::unary_op(&value, op)?)) } @@ -130,9 +129,9 @@ impl ScalarExpression { right_expr, negated, } => { - let value = expr.eval(tuple)?; - let left = left_expr.eval(tuple)?; - let right = right_expr.eval(tuple)?; + let value = expr.eval(tuple, schema)?; + let left = left_expr.eval(tuple, schema)?; + let right = right_expr.eval(tuple, schema)?; let mut is_between = match ( value.partial_cmp(&left).map(Ordering::is_ge), @@ -152,15 +151,16 @@ impl ScalarExpression { for_expr, from_expr, } => { - if let Some(mut string) = DataValue::clone(expr.eval(tuple)?.as_ref()) + if let Some(mut string) = DataValue::clone(expr.eval(tuple, schema)?.as_ref()) .cast(&LogicalType::Varchar(None))? .utf8() { if let Some(from_expr) = from_expr { - string = string.split_off(eval_to_num!(from_expr, tuple).saturating_sub(1)); + string = string + .split_off(eval_to_num!(from_expr, tuple, schema).saturating_sub(1)); } if let Some(for_expr) = for_expr { - let _ = string.split_off(eval_to_num!(for_expr, tuple)); + let _ = string.split_off(eval_to_num!(for_expr, tuple, schema)); } Ok(Arc::new(DataValue::Utf8(Some(string)))) @@ -179,14 +179,14 @@ impl ScalarExpression { let mut values = Vec::with_capacity(exprs.len()); for expr in exprs { - values.push(expr.eval(tuple)?); + values.push(expr.eval(tuple, schema)?); } Ok(Arc::new(DataValue::Tuple( (!values.is_empty()).then_some(values), ))) } ScalarExpression::Function(ScalarFunction { inner, args, .. }) => Ok(Arc::new( - inner.eval(args, tuple)?.cast(inner.return_type())?, + inner.eval(args, tuple, schema)?.cast(inner.return_type())?, )), ScalarExpression::Empty => unreachable!(), ScalarExpression::If { @@ -195,10 +195,10 @@ impl ScalarExpression { right_expr, ty, } => { - if condition.eval(tuple)?.is_true()? { - check_cast(left_expr.eval(tuple)?, ty) + if condition.eval(tuple, schema)?.is_true()? { + check_cast(left_expr.eval(tuple, schema)?, ty) } else { - check_cast(right_expr.eval(tuple)?, ty) + check_cast(right_expr.eval(tuple, schema)?, ty) } } ScalarExpression::IfNull { @@ -206,10 +206,10 @@ impl ScalarExpression { right_expr, ty, } => { - let mut value = left_expr.eval(tuple)?; + let mut value = left_expr.eval(tuple, schema)?; if value.is_null() { - value = right_expr.eval(tuple)?; + value = right_expr.eval(tuple, schema)?; } check_cast(value, ty) } @@ -218,9 +218,9 @@ impl ScalarExpression { right_expr, ty, } => { - let mut value = left_expr.eval(tuple)?; + let mut value = left_expr.eval(tuple, schema)?; - if right_expr.eval(tuple)? == value { + if right_expr.eval(tuple, schema)? == value { value = NULL_VALUE.clone(); } check_cast(value, ty) @@ -229,7 +229,7 @@ impl ScalarExpression { let mut value = None; for expr in exprs { - let temp = expr.eval(tuple)?; + let temp = expr.eval(tuple, schema)?; if !temp.is_null() { value = Some(temp); @@ -248,10 +248,10 @@ impl ScalarExpression { let mut result = None; if let Some(expr) = operand_expr { - operand_value = Some(expr.eval(tuple)?); + operand_value = Some(expr.eval(tuple, schema)?); } for (when_expr, result_expr) in expr_pairs { - let when_value = when_expr.eval(tuple)?; + let when_value = when_expr.eval(tuple, schema)?; let is_true = if let Some(operand_value) = &operand_value { operand_value .binary_op(&when_value, &BinaryOperator::Eq)? @@ -260,13 +260,13 @@ impl ScalarExpression { when_value.is_true()? }; if is_true { - result = Some(result_expr.eval(tuple)?); + result = Some(result_expr.eval(tuple, schema)?); break; } } if result.is_none() { if let Some(expr) = else_expr { - result = Some(expr.eval(tuple)?); + result = Some(expr.eval(tuple, schema)?); } } check_cast(result.unwrap_or_else(|| NULL_VALUE.clone()), ty) diff --git a/src/expression/function.rs b/src/expression/function.rs index e7162f75..1922583d 100644 --- a/src/expression/function.rs +++ b/src/expression/function.rs @@ -1,3 +1,4 @@ +use crate::catalog::ColumnRef; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::types::tuple::Tuple; @@ -40,7 +41,12 @@ pub struct FunctionSummary { } pub trait ScalarFunctionImpl: Debug + Send + Sync { - fn eval(&self, args: &[ScalarExpression], tuple: &Tuple) -> Result; + fn eval( + &self, + args: &[ScalarExpression], + tuple: &Tuple, + schema: &[ColumnRef], + ) -> Result; // TODO: Exploiting monotonicity when optimizing `ScalarFunctionImpl::monotonicity()` fn monotonicity(&self) -> Option; diff --git a/src/expression/mod.rs b/src/expression/mod.rs index a1823dc6..bcad26aa 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -115,7 +115,7 @@ pub enum ScalarExpression { } impl ScalarExpression { - pub fn unpack_alias(&self) -> &ScalarExpression { + pub fn unpack_alias(self) -> ScalarExpression { if let ScalarExpression::Alias { expr, .. } = self { expr.unpack_alias() } else { @@ -123,8 +123,16 @@ impl ScalarExpression { } } + pub fn unpack_alias_ref(&self) -> &ScalarExpression { + if let ScalarExpression::Alias { expr, .. } = self { + expr.unpack_alias_ref() + } else { + self + } + } + pub fn try_reference(&mut self, output_exprs: &[ScalarExpression]) { - let fn_output_column = |expr: &ScalarExpression| expr.unpack_alias().output_column(); + let fn_output_column = |expr: &ScalarExpression| expr.unpack_alias_ref().output_column(); let self_column = fn_output_column(self); if let Some((pos, _)) = output_exprs .iter() diff --git a/src/marcos/mod.rs b/src/marcos/mod.rs index ca9b3447..9b48cee3 100644 --- a/src/marcos/mod.rs +++ b/src/marcos/mod.rs @@ -24,11 +24,11 @@ #[macro_export] macro_rules! implement_from_tuple { ($struct_name:ident, ($($field_name:ident : $field_type:ty => $closure:expr),+)) => { - impl From for $struct_name { - fn from(tuple: Tuple) -> Self { - fn try_get(tuple: &Tuple, field_name: &str) -> Option { + impl From<(&SchemaRef, Tuple)> for $struct_name { + fn from((schema, tuple): (&SchemaRef, Tuple)) -> Self { + fn try_get(tuple: &Tuple, schema: &SchemaRef, field_name: &str) -> Option { let ty = LogicalType::type_trans::()?; - let (idx, _) = tuple.schema_ref + let (idx, _) = schema .iter() .enumerate() .find(|(_, col)| col.name() == field_name)?; @@ -40,7 +40,7 @@ macro_rules! implement_from_tuple { let mut struct_instance = $struct_name::default(); $( - if let Some(value) = try_get::<$field_type>(&tuple, stringify!($field_name)) { + if let Some(value) = try_get::<$field_type>(&tuple, schema, stringify!($field_name)) { $closure( &mut struct_instance, value @@ -84,7 +84,7 @@ macro_rules! function { Arc::new(Self { summary: FunctionSummary { - name: function_name.to_string(), + name: function_name, arg_types } }) @@ -92,11 +92,11 @@ macro_rules! function { } impl ScalarFunctionImpl for $struct_name { - fn eval(&self, args: &[ScalarExpression], tuple: &Tuple) -> Result { + fn eval(&self, args: &[ScalarExpression], tuple: &Tuple, schema: &[ColumnRef]) -> Result { let mut _index = 0; $closure($({ - let mut value = args[_index].eval(tuple)?; + let mut value = args[_index].eval(tuple, schema)?; _index += 1; if value.logical_type() != $arg_ty { @@ -123,18 +123,19 @@ macro_rules! function { #[cfg(test)] mod test { + use crate::catalog::ColumnRef; use crate::catalog::{ColumnCatalog, ColumnDesc}; use crate::errors::DatabaseError; use crate::expression::function::{FuncMonotonicity, FunctionSummary, ScalarFunctionImpl}; use crate::expression::BinaryOperator; use crate::expression::ScalarExpression; - use crate::types::tuple::Tuple; + use crate::types::tuple::{SchemaRef, Tuple}; use crate::types::value::{DataValue, ValueRef}; use crate::types::LogicalType; use std::sync::Arc; - fn build_tuple() -> Tuple { - let columns = Arc::new(vec![ + fn build_tuple() -> (Tuple, SchemaRef) { + let schema_ref = Arc::new(vec![ Arc::new(ColumnCatalog::new( "c1".to_string(), false, @@ -151,11 +152,7 @@ mod test { Arc::new(DataValue::Utf8(Some("LOL".to_string()))), ]; - Tuple { - id: None, - schema_ref: columns, - values, - } + (Tuple { id: None, values }, schema_ref) } #[derive(Default, Debug, PartialEq)] @@ -181,7 +178,8 @@ mod test { #[test] fn test_from_tuple() { - let my_struct = MyStruct::from(build_tuple()); + let (tuple, schema_ref) = build_tuple(); + let my_struct = MyStruct::from((&schema_ref, tuple)); println!("{:?}", my_struct); @@ -203,9 +201,9 @@ mod test { ], &Tuple { id: None, - schema_ref: Arc::new(vec![]), values: vec![], }, + &vec![], )?; println!("{:?}", function); diff --git a/src/optimizer/rule/normalization/pushdown_limit.rs b/src/optimizer/rule/normalization/pushdown_limit.rs index e2f4413c..5df0beb0 100644 --- a/src/optimizer/rule/normalization/pushdown_limit.rs +++ b/src/optimizer/rule/normalization/pushdown_limit.rs @@ -139,8 +139,10 @@ impl NormalizationRule for PushLimitThroughJoin { let children = graph.children_at(child_id).collect_vec(); if let Some(grandson_id) = match ty { - JoinType::Left => children.first(), - JoinType::Right => children.last(), + JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => { + children.first() + } + JoinType::RightOuter => children.last(), _ => None, } { graph.add_node(child_id, Some(*grandson_id), Operator::Limit(op.clone())); diff --git a/src/optimizer/rule/normalization/pushdown_predicates.rs b/src/optimizer/rule/normalization/pushdown_predicates.rs index 3dfe6cd2..293e11f4 100644 --- a/src/optimizer/rule/normalization/pushdown_predicates.rs +++ b/src/optimizer/rule/normalization/pushdown_predicates.rs @@ -111,7 +111,11 @@ impl NormalizationRule for PushPredicateThroughJoin { if let Operator::Join(child_op) = graph.operator(child_id) { if !matches!( child_op.join_type, - JoinType::Inner | JoinType::Left | JoinType::Right + JoinType::Inner + | JoinType::LeftOuter + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::RightOuter ) { return Ok(()); } @@ -149,7 +153,7 @@ impl NormalizationRule for PushPredicateThroughJoin { common_filters } - JoinType::Left => { + JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => { if !left_filters.is_empty() { if let Some(left_filter_op) = reduce_filters(left_filters, op.having) { new_ops.0 = Some(Operator::Filter(left_filter_op)); @@ -161,7 +165,7 @@ impl NormalizationRule for PushPredicateThroughJoin { .chain(right_filters) .collect_vec() } - JoinType::Right => { + JoinType::RightOuter => { if !right_filters.is_empty() { if let Some(right_filter_op) = reduce_filters(right_filters, op.having) { diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 18cd62ab..6499a5a3 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -1,6 +1,7 @@ pub mod operator; -use crate::catalog::TableName; +use crate::catalog::{ColumnCatalog, TableName}; +use crate::planner::operator::join::JoinType; use crate::planner::operator::union::UnionOperator; use crate::planner::operator::values::ValuesOperator; use crate::planner::operator::{Operator, PhysicalOption}; @@ -61,7 +62,10 @@ impl LogicalPlan { .collect_vec(); Arc::new(out_columns) } - Operator::Join(_) => { + Operator::Join(op) => { + if matches!(op.join_type, JoinType::LeftSemi | JoinType::LeftAnti) { + return self.childrens[0].output_schema().clone(); + } let out_columns = self .childrens .iter_mut() @@ -85,30 +89,54 @@ impl LogicalPlan { .collect_vec(); Arc::new(out_columns) } - Operator::Values(ValuesOperator { schema_ref, .. }) => schema_ref.clone(), - Operator::Union(UnionOperator { - left_schema_ref, - right_schema_ref, - }) => { - let mut schema = Vec::clone(left_schema_ref); - schema.extend_from_slice(right_schema_ref.as_slice()); - Arc::new(schema) + Operator::Values(ValuesOperator { schema_ref, .. }) + | Operator::Union(UnionOperator { schema_ref }) => schema_ref.clone(), + Operator::Dummy => Arc::new(vec![]), + Operator::Show => Arc::new(vec![ + Arc::new(ColumnCatalog::new_dummy("TABLE".to_string())), + Arc::new(ColumnCatalog::new_dummy("COLUMN_METAS_LEN".to_string())), + ]), + Operator::Explain => { + Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))]) } - Operator::Dummy - | Operator::Show - | Operator::Explain - | Operator::Describe(_) - | Operator::Insert(_) - | Operator::Update(_) - | Operator::Delete(_) - | Operator::Analyze(_) - | Operator::AddColumn(_) - | Operator::DropColumn(_) - | Operator::CreateTable(_) - | Operator::DropTable(_) - | Operator::Truncate(_) - | Operator::CopyFromFile(_) - | Operator::CopyToFile(_) => Arc::new(vec![]), + Operator::Describe(_) => Arc::new(vec![ + Arc::new(ColumnCatalog::new_dummy("FIELD".to_string())), + Arc::new(ColumnCatalog::new_dummy("TYPE".to_string())), + Arc::new(ColumnCatalog::new_dummy("NULL".to_string())), + Arc::new(ColumnCatalog::new_dummy("Key".to_string())), + Arc::new(ColumnCatalog::new_dummy("DEFAULT".to_string())), + ]), + Operator::Insert(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "INSERTED".to_string(), + ))]), + Operator::Update(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "UPDATED".to_string(), + ))]), + Operator::Delete(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "DELETED".to_string(), + ))]), + Operator::Analyze(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "COLUMN_META_PATH".to_string(), + ))]), + Operator::AddColumn(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "ADD COLUMN SUCCESS".to_string(), + ))]), + Operator::DropColumn(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "DROP COLUMN SUCCESS".to_string(), + ))]), + Operator::CreateTable(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "CREATE TABLE SUCCESS".to_string(), + ))]), + Operator::DropTable(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "DROP TABLE SUCCESS".to_string(), + ))]), + Operator::Truncate(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "TRUNCATE TABLE SUCCESS".to_string(), + ))]), + Operator::CopyFromFile(_) => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "COPY FROM SOURCE".to_string(), + ))]), + Operator::CopyToFile(_) => todo!(), }) } diff --git a/src/planner/operator/join.rs b/src/planner/operator/join.rs index ce4fdaef..4075f970 100644 --- a/src/planner/operator/join.rs +++ b/src/planner/operator/join.rs @@ -10,8 +10,10 @@ use super::Operator; #[derive(Debug, Display, PartialEq, Eq, Clone, Copy, Hash, Ord, PartialOrd)] pub enum JoinType { Inner, - Left, - Right, + LeftOuter, + LeftSemi, + LeftAnti, + RightOuter, Full, Cross, } diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 976fc165..f58fd0df 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -127,24 +127,14 @@ impl Operator { .collect_vec(), ), Operator::Sort(_) | Operator::Limit(_) => None, - Operator::Values(ValuesOperator { schema_ref, .. }) => Some( + Operator::Values(ValuesOperator { schema_ref, .. }) + | Operator::Union(UnionOperator { schema_ref }) => Some( schema_ref .iter() .cloned() .map(ScalarExpression::ColumnRef) .collect_vec(), ), - Operator::Union(UnionOperator { - left_schema_ref, - right_schema_ref, - }) => Some( - left_schema_ref - .iter() - .chain(right_schema_ref.iter()) - .cloned() - .map(ScalarExpression::ColumnRef) - .collect_vec(), - ), Operator::Show | Operator::Explain | Operator::Describe(_) @@ -203,15 +193,8 @@ impl Operator { .map(|field| &field.expr) .flat_map(|expr| expr.referenced_columns(only_column_ref)) .collect_vec(), - Operator::Values(ValuesOperator { schema_ref, .. }) => Vec::clone(schema_ref), - Operator::Union(UnionOperator { - left_schema_ref, - right_schema_ref, - }) => { - let mut schema = Vec::clone(left_schema_ref); - schema.extend_from_slice(right_schema_ref.as_slice()); - schema - } + Operator::Values(ValuesOperator { schema_ref, .. }) + | Operator::Union(UnionOperator { schema_ref }) => Vec::clone(schema_ref), Operator::Analyze(op) => op.columns.clone(), Operator::Delete(op) => vec![op.primary_key_column.clone()], Operator::Dummy diff --git a/src/planner/operator/union.rs b/src/planner/operator/union.rs index 071fa05f..32cc73ef 100644 --- a/src/planner/operator/union.rs +++ b/src/planner/operator/union.rs @@ -4,24 +4,20 @@ use crate::types::tuple::SchemaRef; use itertools::Itertools; use std::fmt; use std::fmt::Formatter; + #[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct UnionOperator { - pub left_schema_ref: SchemaRef, - pub right_schema_ref: SchemaRef, + pub schema_ref: SchemaRef, } impl UnionOperator { pub fn build( - left_schema_ref: SchemaRef, - right_schema_ref: SchemaRef, + schema_ref: SchemaRef, left_plan: LogicalPlan, right_plan: LogicalPlan, ) -> LogicalPlan { LogicalPlan::new( - Operator::Union(UnionOperator { - left_schema_ref, - right_schema_ref, - }), + Operator::Union(UnionOperator { schema_ref }), vec![left_plan, right_plan], ) } @@ -29,22 +25,13 @@ impl UnionOperator { impl fmt::Display for UnionOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - let left_columns = self - .left_schema_ref - .iter() - .map(|column| column.name().to_string()) - .join(", "); - let right_columns = self - .right_schema_ref + let schema = self + .schema_ref .iter() .map(|column| column.name().to_string()) .join(", "); - write!( - f, - "Union left: [{}], right: [{}]", - left_columns, right_columns - )?; + write!(f, "Union: [{}]", schema)?; Ok(()) } diff --git a/src/planner/operator/values.rs b/src/planner/operator/values.rs index 330c9772..940426df 100644 --- a/src/planner/operator/values.rs +++ b/src/planner/operator/values.rs @@ -13,12 +13,15 @@ pub struct ValuesOperator { impl fmt::Display for ValuesOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let columns = self - .schema_ref + .rows .iter() - .map(|column| column.name().to_string()) + .map(|row| { + let row_string = row.iter().map(|value| format!("{value}")).join(", "); + format!("[{row_string}]") + }) .join(", "); - write!(f, "Values [{}], RowsLen: {}", columns, self.rows.len())?; + write!(f, "Values {}, RowsLen: {}", columns, self.rows.len())?; Ok(()) } diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 9807998d..b7b0c5c8 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -13,7 +13,6 @@ use kip_db::kernel::lsm::mvcc::{CheckType, TransactionIter}; use kip_db::kernel::lsm::storage::Config; use kip_db::kernel::lsm::{mvcc, storage}; use kip_db::kernel::utils::lru_cache::ShardingLruCache; -use kip_db::KernelError; use std::collections::hash_map::RandomState; use std::collections::{Bound, VecDeque}; use std::ops::SubAssign; @@ -141,16 +140,14 @@ impl Transaction for KipTransaction { &mut self, table_name: &str, index: Index, - tuple_ids: Vec, + tuple_id: &TupleId, is_unique: bool, ) -> Result<(), DatabaseError> { - let (key, value) = TableCodec::encode_index(table_name, &index, &tuple_ids)?; + let (key, value) = TableCodec::encode_index(table_name, &index, tuple_id)?; if let Some(bytes) = self.tx.get(&key)? { if is_unique { - let old_tuple_ids = TableCodec::decode_index(&bytes)?; - - return if old_tuple_ids[0] != tuple_ids[0] { + return if bytes != value { Err(DatabaseError::DuplicateUniqueValue) } else { Ok(()) @@ -224,7 +221,7 @@ impl Transaction for KipTransaction { vec![col_id], true, false, - ); + )?; let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?; self.tx.set(key, value); } @@ -244,11 +241,13 @@ impl Transaction for KipTransaction { &mut self, table_name: &TableName, column_name: &str, - if_exists: bool, ) -> Result<(), DatabaseError> { if let Some(catalog) = self.table(table_name.clone()).cloned() { let column = catalog.get_column_by_name(column_name).unwrap(); + let (key, _) = TableCodec::encode_column(table_name, column)?; + self.tx.remove(&key)?; + if let Some(index_meta) = catalog.get_unique_index(&column.id().unwrap()) { let (index_meta_key, _) = TableCodec::encode_index_meta(table_name, index_meta)?; self.tx.remove(&index_meta_key)?; @@ -256,17 +255,7 @@ impl Transaction for KipTransaction { let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id); Self::_drop_data(&mut self.tx, &index_min, &index_max)?; } - let (key, _) = TableCodec::encode_column(table_name, column)?; - match self.tx.remove(&key) { - Ok(_) => (), - Err(KernelError::KeyNotFound) => { - if !if_exists { - Err(KernelError::KeyNotFound)?; - } - } - err => err?, - } self.table_cache.remove(table_name); Ok(()) @@ -471,7 +460,7 @@ impl KipTransaction { vec![col_id], col.desc.is_unique, is_primary, - ); + )?; let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?; tx.set(key, value); } @@ -574,7 +563,6 @@ mod test { &"test".to_string(), Tuple { id: Some(Arc::new(DataValue::Int32(Some(1)))), - schema_ref: columns.clone(), values: vec![ Arc::new(DataValue::Int32(Some(1))), Arc::new(DataValue::Boolean(Some(true))), @@ -586,7 +574,6 @@ mod test { &"test".to_string(), Tuple { id: Some(Arc::new(DataValue::Int32(Some(2)))), - schema_ref: columns.clone(), values: vec![ Arc::new(DataValue::Int32(Some(2))), Arc::new(DataValue::Boolean(Some(false))), @@ -640,6 +627,7 @@ mod test { id: 0, column_ids: vec![0], table_name, + pk_ty: LogicalType::Integer, name: "pk_a".to_string(), is_unique: false, is_primary: true, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 5a82cffa..c124b211 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -53,7 +53,7 @@ pub trait Transaction: Sync + Send + 'static { &mut self, table_name: &str, index: Index, - tuple_ids: Vec, + tuple_id: &TupleId, is_unique: bool, ) -> Result<(), DatabaseError>; @@ -75,12 +75,7 @@ pub trait Transaction: Sync + Send + 'static { if_not_exists: bool, ) -> Result; - fn drop_column( - &mut self, - table_name: &TableName, - column: &str, - if_exists: bool, - ) -> Result<(), DatabaseError>; + fn drop_column(&mut self, table_name: &TableName, column: &str) -> Result<(), DatabaseError>; fn create_table( &mut self, @@ -178,7 +173,7 @@ impl Iter for IndexIter<'_> { if Self::offset_move(&mut self.offset) { continue; } - match value { + let tuple = match value { IndexValue::PrimaryKey(tuple) => { if let Some(num) = self.limit.as_mut() { num.sub_assign(1); @@ -187,11 +182,12 @@ impl Iter for IndexIter<'_> { return Ok(Some(tuple)); } IndexValue::Normal(tuple_id) => { - if let Some(tuple) = self.get_tuple_by_id(&tuple_id)? { - return Ok(Some(tuple)); - } + self.get_tuple_by_id(&tuple_id)?.ok_or_else(|| { + DatabaseError::NotFound("index's tuple_id", tuple_id.to_string()) + })? } - } + }; + return Ok(Some(tuple)); } assert!(self.index_values.is_empty()); @@ -201,7 +197,7 @@ impl Iter for IndexIter<'_> { let mut has_next = false; while let Some((_, value_option)) = iter.try_next()? { if let Some(value) = value_option { - if self.index_meta.is_primary { + let index = if self.index_meta.is_primary { let tuple = TableCodec::decode_tuple( &self.table.types(), &self.projections, @@ -209,12 +205,11 @@ impl Iter for IndexIter<'_> { &value, ); - self.index_values.push_back(IndexValue::PrimaryKey(tuple)); + IndexValue::PrimaryKey(tuple) } else { - for tuple_id in TableCodec::decode_index(&value)? { - self.index_values.push_back(IndexValue::Normal(tuple_id)); - } - } + IndexValue::Normal(TableCodec::decode_index(&value, &self.index_meta.pk_ty)) + }; + self.index_values.push_back(index); has_next = true; break; } @@ -265,10 +260,11 @@ impl Iter for IndexIter<'_> { ConstantBinary::Eq(val) => { let key = self.val_to_key(val)?; if let Some(bytes) = self.tx.get(&key)? { - if self.index_meta.is_unique { - for tuple_id in TableCodec::decode_index(&bytes)? { - self.index_values.push_back(IndexValue::Normal(tuple_id)); - } + let index = if self.index_meta.is_unique { + IndexValue::Normal(TableCodec::decode_index( + &bytes, + &self.index_meta.pk_ty, + )) } else if self.index_meta.is_primary { let tuple = TableCodec::decode_tuple( &self.table.types(), @@ -277,10 +273,11 @@ impl Iter for IndexIter<'_> { &bytes, ); - self.index_values.push_back(IndexValue::PrimaryKey(tuple)); + IndexValue::PrimaryKey(tuple) } else { todo!() - } + }; + self.index_values.push_back(index); } self.scope_iter = None; } diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 0e377f41..e9f01959 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -1,10 +1,12 @@ use crate::catalog::{ColumnCatalog, TableMeta}; use crate::errors::DatabaseError; use crate::types::index::{Index, IndexId, IndexMeta}; -use crate::types::tuple::{SchemaRef, Tuple, TupleId}; +use crate::types::tuple::{Schema, Tuple, TupleId}; +use crate::types::value::DataValue; use crate::types::LogicalType; use bytes::Bytes; use lazy_static::lazy_static; +use std::sync::Arc; const BOUND_MIN_TAG: u8 = 0; const BOUND_MAX_TAG: u8 = 1; @@ -173,10 +175,10 @@ impl TableCodec { pub fn decode_tuple( table_types: &[LogicalType], projections: &[usize], - tuple_schema_ref: &SchemaRef, + schema: &Schema, bytes: &[u8], ) -> Tuple { - Tuple::deserialize_from(table_types, projections, tuple_schema_ref, bytes) + Tuple::deserialize_from(table_types, projections, schema, bytes) } /// Key: {TableName}{INDEX_META_TAG}{BOUND_MIN_TAG}{IndexID} @@ -212,14 +214,11 @@ impl TableCodec { pub fn encode_index( name: &str, index: &Index, - tuple_ids: &[TupleId], + tuple_id: &TupleId, ) -> Result<(Bytes, Bytes), DatabaseError> { let key = TableCodec::encode_index_key(name, index)?; - Ok(( - Bytes::from(key), - Bytes::from(bincode::serialize(tuple_ids)?), - )) + Ok((Bytes::from(key), Bytes::from(tuple_id.to_raw()))) } pub fn encode_index_key(name: &str, index: &Index) -> Result, DatabaseError> { @@ -236,8 +235,8 @@ impl TableCodec { Ok(key_prefix) } - pub fn decode_index(bytes: &[u8]) -> Result, DatabaseError> { - Ok(bincode::deserialize(bytes)?) + pub fn decode_index(bytes: &[u8], primary_key_ty: &LogicalType) -> TupleId { + Arc::new(DataValue::from_raw(bytes, primary_key_ty)) } /// Key: {TableName}{COLUMN_TAG}{BOUND_MIN_TAG}{ColumnId} @@ -316,17 +315,16 @@ mod tests { let tuple = Tuple { id: Some(Arc::new(DataValue::Int32(Some(0)))), - schema_ref: table_catalog.schema_ref().clone(), values: vec![ Arc::new(DataValue::Int32(Some(0))), Arc::new(DataValue::Decimal(Some(Decimal::new(1, 0)))), ], }; let (_, bytes) = TableCodec::encode_tuple(&table_catalog.name, &tuple)?; - let columns = table_catalog.schema_ref().clone(); + let schema = table_catalog.schema_ref(); assert_eq!( - TableCodec::decode_tuple(&table_catalog.types(), &[0, 1], &columns, &bytes), + TableCodec::decode_tuple(&table_catalog.types(), &[0, 1], schema, &bytes), tuple ); @@ -354,6 +352,7 @@ mod tests { id: 0, column_ids: vec![0], table_name: Arc::new("T1".to_string()), + pk_ty: LogicalType::Integer, name: "index_1".to_string(), is_unique: false, is_primary: false, @@ -373,10 +372,13 @@ mod tests { id: 0, column_values: vec![Arc::new(DataValue::Int32(Some(0)))], }; - let tuple_ids = vec![Arc::new(DataValue::Int32(Some(0)))]; - let (_, bytes) = TableCodec::encode_index(&table_catalog.name, &index, &tuple_ids)?; + let tuple_id = Arc::new(DataValue::Int32(Some(0))); + let (_, bytes) = TableCodec::encode_index(&table_catalog.name, &index, &tuple_id)?; - assert_eq!(TableCodec::decode_index(&bytes)?, tuple_ids); + assert_eq!( + TableCodec::decode_index(&bytes, &tuple_id.logical_type()), + tuple_id + ); Ok(()) } @@ -449,6 +451,7 @@ mod tests { id: index_id as u32, column_ids: vec![], table_name: Arc::new(table_name.to_string()), + pk_ty: LogicalType::Integer, name: "".to_string(), is_unique: false, is_primary: false, diff --git a/src/types/index.rs b/src/types/index.rs index db87b598..66fce027 100644 --- a/src/types/index.rs +++ b/src/types/index.rs @@ -1,7 +1,7 @@ use crate::catalog::TableName; use crate::expression::simplify::ConstantBinary; use crate::types::value::ValueRef; -use crate::types::ColumnId; +use crate::types::{ColumnId, LogicalType}; use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::fmt; @@ -22,6 +22,7 @@ pub struct IndexMeta { pub id: IndexId, pub column_ids: Vec, pub table_name: TableName, + pub pk_ty: LogicalType, pub name: String, pub is_unique: bool, pub is_primary: bool, diff --git a/src/types/tuple.rs b/src/types/tuple.rs index 7d748c79..b4acbd52 100644 --- a/src/types/tuple.rs +++ b/src/types/tuple.rs @@ -15,7 +15,6 @@ pub type SchemaRef = Arc; #[derive(Clone, Debug, PartialEq)] pub struct Tuple { pub id: Option, - pub schema_ref: SchemaRef, pub values: Vec, } @@ -23,17 +22,17 @@ impl Tuple { pub fn deserialize_from( table_types: &[LogicalType], projections: &[usize], - tuple_schema_ref: &SchemaRef, + schema: &Schema, bytes: &[u8], ) -> Self { - assert!(!tuple_schema_ref.is_empty()); - assert_eq!(projections.len(), tuple_schema_ref.len()); + assert!(!schema.is_empty()); + assert_eq!(projections.len(), schema.len()); fn is_none(bits: u8, i: usize) -> bool { bits & (1 << (7 - i)) > 0 } - let values_len = tuple_schema_ref.len(); + let values_len = schema.len(); let mut tuple_values = Vec::with_capacity(values_len); let bits_len = (values_len + BITS_MAX_INDEX) / BITS_MAX_INDEX; let mut id_option = None; @@ -42,18 +41,13 @@ impl Tuple { let mut pos = bits_len; for (i, logic_type) in table_types.iter().enumerate() { - if projection_i >= tuple_schema_ref.len() { + if projection_i >= values_len { break; } if is_none(bytes[i / BITS_MAX_INDEX], i % BITS_MAX_INDEX) { if projections[projection_i] == i { tuple_values.push(Arc::new(DataValue::none(logic_type))); - Self::values_push( - tuple_schema_ref, - &tuple_values, - &mut id_option, - &mut projection_i, - ); + Self::values_push(schema, &tuple_values, &mut id_option, &mut projection_i); } } else if let Some(len) = logic_type.raw_len() { /// fixed length (e.g.: int) @@ -62,12 +56,7 @@ impl Tuple { &bytes[pos..pos + len], logic_type, ))); - Self::values_push( - tuple_schema_ref, - &tuple_values, - &mut id_option, - &mut projection_i, - ); + Self::values_push(schema, &tuple_values, &mut id_option, &mut projection_i); } pos += len; } else { @@ -79,12 +68,7 @@ impl Tuple { &bytes[pos..pos + len], logic_type, ))); - Self::values_push( - tuple_schema_ref, - &tuple_values, - &mut id_option, - &mut projection_i, - ); + Self::values_push(schema, &tuple_values, &mut id_option, &mut projection_i); } pos += len; } @@ -92,13 +76,12 @@ impl Tuple { Tuple { id: id_option, - schema_ref: tuple_schema_ref.clone(), values: tuple_values, } } fn values_push( - tuple_columns: &Arc>, + tuple_columns: &Schema, tuple_values: &[ValueRef], id_option: &mut Option>, projection_i: &mut usize, @@ -137,7 +120,7 @@ impl Tuple { } } -pub fn create_table(tuples: &[Tuple]) -> Table { +pub fn create_table(schema: &Schema, tuples: &[Tuple]) -> Table { let mut table = Table::new(); if tuples.is_empty() { @@ -145,12 +128,14 @@ pub fn create_table(tuples: &[Tuple]) -> Table { } let mut header = Vec::new(); - for col in tuples[0].schema_ref.iter() { + for col in schema.iter() { header.push(Cell::new(col.full_name())); } table.set_header(header); for tuple in tuples { + assert_eq!(schema.len(), tuple.values.len()); + let cells = tuple .values .iter() @@ -246,7 +231,6 @@ mod tests { let tuples = vec![ Tuple { id: Some(Arc::new(DataValue::Int32(Some(0)))), - schema_ref: columns.clone(), values: vec![ Arc::new(DataValue::Int32(Some(0))), Arc::new(DataValue::UInt32(Some(1))), @@ -265,7 +249,6 @@ mod tests { }, Tuple { id: Some(Arc::new(DataValue::Int32(Some(1)))), - schema_ref: columns.clone(), values: vec![ Arc::new(DataValue::Int32(Some(1))), Arc::new(DataValue::UInt32(None)), diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs index 5bbb225a..82f0aef9 100644 --- a/src/types/tuple_builder.rs +++ b/src/types/tuple_builder.rs @@ -1,69 +1,45 @@ -use crate::catalog::ColumnCatalog; use crate::errors::DatabaseError; -use crate::types::tuple::{SchemaRef, Tuple}; -use crate::types::value::{DataValue, ValueRef}; +use crate::types::tuple::{Schema, Tuple}; +use crate::types::value::DataValue; use std::sync::Arc; pub struct TupleBuilder<'a> { - schema_ref: &'a SchemaRef, + schema: &'a Schema, } impl<'a> TupleBuilder<'a> { - pub fn new(schema_ref: &'a SchemaRef) -> Self { - TupleBuilder { schema_ref } + pub fn new(schema: &'a Schema) -> Self { + TupleBuilder { schema } } - pub fn build_result(header: String, message: String) -> Result { - let columns = Arc::new(vec![Arc::new(ColumnCatalog::new_dummy(header))]); + pub fn build_result(message: String) -> Tuple { let values = vec![Arc::new(DataValue::Utf8(Some(message)))]; - Ok(Tuple { - id: None, - schema_ref: columns, - values, - }) - } - - pub fn build( - &self, - id: Option, - values: Vec, - ) -> Result { - if values.len() != self.schema_ref.len() { - return Err(DatabaseError::MisMatch("types", "values")); - } - - Ok(Tuple { - id, - schema_ref: self.schema_ref.clone(), - values, - }) + Tuple { id: None, values } } pub fn build_with_row<'b>( &self, row: impl IntoIterator, ) -> Result { - let mut values = Vec::with_capacity(self.schema_ref.len()); + let mut values = Vec::with_capacity(self.schema.len()); let mut primary_key = None; for (i, value) in row.into_iter().enumerate() { - let data_value = Arc::new( - DataValue::Utf8(Some(value.to_string())).cast(self.schema_ref[i].datatype())?, - ); + let data_value = + Arc::new(DataValue::Utf8(Some(value.to_string())).cast(self.schema[i].datatype())?); - if primary_key.is_none() && self.schema_ref[i].desc.is_primary { + if primary_key.is_none() && self.schema[i].desc.is_primary { primary_key = Some(data_value.clone()); } values.push(data_value); } - if values.len() != self.schema_ref.len() { + if values.len() != self.schema.len() { return Err(DatabaseError::MisMatch("types", "values")); } Ok(Tuple { id: primary_key, - schema_ref: self.schema_ref.clone(), values, }) } diff --git a/tests/slt/sql_2016/E061_11.slt b/tests/slt/sql_2016/E061_11.slt index 9d204c44..19e9b691 100644 --- a/tests/slt/sql_2016/E061_11.slt +++ b/tests/slt/sql_2016/E061_11.slt @@ -1,8 +1,7 @@ # E061-11: Subqueries in IN predicate -# TODO: Support Subquery on `WHERE` +statement ok +CREATE TABLE TABLE_E061_11_01_01 ( ID INT PRIMARY KEY, A INT ); -# statement ok -# CREATE TABLE TABLE_E061_11_01_01 ( ID INT PRIMARY KEY, A INT ); - -# SELECT A FROM TABLE_E061_11_01_01 WHERE A IN ( SELECT 1 ) +query I +SELECT A FROM TABLE_E061_11_01_01 WHERE A IN ( SELECT 1 ); diff --git a/tests/slt/sql_2016/E061_13.slt b/tests/slt/sql_2016/E061_13.slt index 4b9534f7..5ffc02df 100644 --- a/tests/slt/sql_2016/E061_13.slt +++ b/tests/slt/sql_2016/E061_13.slt @@ -1,19 +1,19 @@ # E061-13: Correlated subqueries -# TODO: Support Subquery on `WHERE` with `IN/Not IN` +statement ok +CREATE TABLE TABLE_E061_13_01_011 ( ID INT PRIMARY KEY, A INT ); -# statement ok -# CREATE TABLE TABLE_E061_13_01_011 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_13_01_012 ( ID INT PRIMARY KEY, B INT ); -# statement ok -# CREATE TABLE TABLE_E061_13_01_012 ( ID INT PRIMARY KEY, B INT ); +query I +SELECT A FROM TABLE_E061_13_01_011 WHERE A IN ( SELECT B FROM TABLE_E061_13_01_012 WHERE B = A ); -# SELECT A FROM TABLE_E061_13_01_011 WHERE A IN ( SELECT B FROM TABLE_E061_13_01_012 WHERE B = A ) +statement ok +CREATE TABLE TABLE_E061_13_01_021 ( ID INT PRIMARY KEY, A INT ); -# statement ok -# CREATE TABLE TABLE_E061_13_01_021 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_13_01_022 ( ID INT PRIMARY KEY, B INT ); -# statement ok -# CREATE TABLE TABLE_E061_13_01_022 ( ID INT PRIMARY KEY, B INT ); - -# SELECT A FROM TABLE_E061_13_01_021 WHERE A NOT IN ( SELECT B FROM TABLE_E061_13_01_022 WHERE B = A ) +query I +SELECT A FROM TABLE_E061_13_01_021 WHERE A NOT IN ( SELECT B FROM TABLE_E061_13_01_022 WHERE B = A ); diff --git a/tests/slt/sql_2016/E091_01.slt b/tests/slt/sql_2016/E091_01.slt index 8c1b5b86..f3c72249 100644 --- a/tests/slt/sql_2016/E091_01.slt +++ b/tests/slt/sql_2016/E091_01.slt @@ -5,3 +5,5 @@ CREATE TABLE TABLE_E091_01_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT AVG ( A ) FROM TABLE_E091_01_01_01 +---- +0.0 diff --git a/tests/slt/sql_2016/E091_02.slt b/tests/slt/sql_2016/E091_02.slt index 265184b1..9cb40e34 100644 --- a/tests/slt/sql_2016/E091_02.slt +++ b/tests/slt/sql_2016/E091_02.slt @@ -5,9 +5,13 @@ CREATE TABLE TABLE_E091_02_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query I SELECT COUNT ( * ) FROM TABLE_E091_02_01_01 +---- +0 statement ok CREATE TABLE TABLE_E091_02_01_02 ( ID INT PRIMARY KEY, A FLOAT ); query I SELECT COUNT ( A ) FROM TABLE_E091_02_01_02 +---- +0 diff --git a/tests/slt/sql_2016/E091_03.slt b/tests/slt/sql_2016/E091_03.slt index dde3f42a..f7d428fc 100644 --- a/tests/slt/sql_2016/E091_03.slt +++ b/tests/slt/sql_2016/E091_03.slt @@ -5,3 +5,5 @@ CREATE TABLE TABLE_E091_03_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT MAX ( A ) FROM TABLE_E091_03_01_01 +---- +null \ No newline at end of file diff --git a/tests/slt/sql_2016/E091_04.slt b/tests/slt/sql_2016/E091_04.slt index f8f0e3f0..199fa097 100644 --- a/tests/slt/sql_2016/E091_04.slt +++ b/tests/slt/sql_2016/E091_04.slt @@ -5,3 +5,5 @@ CREATE TABLE TABLE_E091_04_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT MIN ( A ) FROM TABLE_E091_04_01_01 +---- +null \ No newline at end of file diff --git a/tests/slt/sql_2016/E091_05.slt b/tests/slt/sql_2016/E091_05.slt index eb208adb..03d3ba99 100644 --- a/tests/slt/sql_2016/E091_05.slt +++ b/tests/slt/sql_2016/E091_05.slt @@ -5,3 +5,5 @@ CREATE TABLE TABLE_E091_05_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT SUM ( A ) FROM TABLE_E091_05_01_01 +---- +0.0 \ No newline at end of file diff --git a/tests/slt/sql_2016/E091_06.slt b/tests/slt/sql_2016/E091_06.slt index 6f0fcabd..a223d091 100644 --- a/tests/slt/sql_2016/E091_06.slt +++ b/tests/slt/sql_2016/E091_06.slt @@ -5,27 +5,37 @@ CREATE TABLE TABLE_E091_06_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT AVG ( ALL A ) FROM TABLE_E091_06_01_01 +---- +0.0 statement ok CREATE TABLE TABLE_E091_06_02_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT COUNT ( ALL A ) FROM TABLE_E091_06_02_01 +---- +0 statement ok CREATE TABLE TABLE_E091_06_03_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT MAX ( ALL A ) FROM TABLE_E091_06_03_01 +---- +null statement ok CREATE TABLE TABLE_E091_06_04_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT MIN ( ALL A ) FROM TABLE_E091_06_04_01 +---- +null statement ok CREATE TABLE TABLE_E091_06_05_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT SUM ( ALL A ) FROM TABLE_E091_06_05_01 +---- +0.0 diff --git a/tests/slt/sql_2016/E091_07.slt b/tests/slt/sql_2016/E091_07.slt index e6b18c12..3def4e2a 100644 --- a/tests/slt/sql_2016/E091_07.slt +++ b/tests/slt/sql_2016/E091_07.slt @@ -5,27 +5,37 @@ CREATE TABLE TABLE_E091_07_01_01 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT AVG ( DISTINCT A ) FROM TABLE_E091_07_01_01 +---- +0.0 statement ok CREATE TABLE TABLE_E091_07_01_02 ( ID INT PRIMARY KEY, A FLOAT ); query I SELECT COUNT ( DISTINCT A ) FROM TABLE_E091_07_01_02 +---- +0 statement ok CREATE TABLE TABLE_E091_07_01_03 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT MAX ( DISTINCT A ) FROM TABLE_E091_07_01_03 +---- +null statement ok CREATE TABLE TABLE_E091_07_01_04 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT MIN ( DISTINCT A ) FROM TABLE_E091_07_01_04 +---- +null statement ok CREATE TABLE TABLE_E091_07_01_05 ( ID INT PRIMARY KEY, A FLOAT ); query F SELECT SUM ( DISTINCT A ) FROM TABLE_E091_07_01_05 +---- +0.0 diff --git a/tests/slt/subquery.slt b/tests/slt/subquery.slt index 5df37dbc..bc0ea9ac 100644 --- a/tests/slt/subquery.slt +++ b/tests/slt/subquery.slt @@ -44,5 +44,30 @@ select * from t1 where a <= (select 4) and (-a + 1) < (select 1) - 1 ---- 1 3 4 +statement ok +insert into t1 values (2, 3, 3), (3, 1, 4); + +query III +select * from t1 where a in (select 1) +---- +0 1 2 +3 1 4 + +query III +select * from t1 where a in (select 1) and b = 4 +---- +3 1 4 + +query III +select * from t1 where a not in (select 1) +---- +1 3 4 +2 3 3 + +query III +select * from t1 where a not in (select 1) and b = 3 +---- +2 3 3 + statement ok drop table t1; \ No newline at end of file diff --git a/tests/sqllogictest/src/lib.rs b/tests/sqllogictest/src/lib.rs index cd392a8e..a6b8c505 100644 --- a/tests/sqllogictest/src/lib.rs +++ b/tests/sqllogictest/src/lib.rs @@ -15,7 +15,7 @@ impl AsyncDB for SQLBase { async fn run(&mut self, sql: &str) -> Result, Self::Error> { let start = Instant::now(); - let tuples = self.db.run(sql).await?; + let (schema, tuples) = self.db.run(sql).await?; println!("|— Input SQL: {}", sql); println!(" |— time spent: {:?}", start.elapsed()); @@ -23,7 +23,7 @@ impl AsyncDB for SQLBase { return Ok(DBOutput::StatementComplete(0)); } - let types = vec![DefaultColumnType::Any; tuples[0].schema_ref.len()]; + let types = vec![DefaultColumnType::Any; schema.len()]; let rows = tuples .into_iter() .map(|tuple| {