From eb9ab5a53c9359d7bb4c8c617aa8ba2c602495c2 Mon Sep 17 00:00:00 2001 From: junxiangMu <63799833+guojidan@users.noreply.github.com> Date: Tue, 5 Dec 2023 20:37:45 +0800 Subject: [PATCH] Alter table (#104) * alter table * refactor(column): remove field: `table_name` * feat(create_table): add `default` on `CreateTable` * feat(cast): support `CAST` Function * feat: Constraints for CreateTable and Fix `Or` BinaryOperator in scope_aggregation cause index scan error * version up * feat(create_table): support `if not exists` * version up * code fmt * support add column * fix conflict * split operator * optimize * optimize * fix tuple decode err * fmt --------- Co-authored-by: Kould <2435992353@qq.com> --- src/binder/alter_table.rs | 71 +++++++++++++++++++ src/binder/create_table.rs | 2 +- src/binder/insert.rs | 1 - src/binder/mod.rs | 2 + .../executor/ddl/alter_table/add_column.rs | 61 ++++++++++++++++ src/execution/executor/ddl/alter_table/mod.rs | 1 + src/execution/executor/ddl/mod.rs | 1 + src/execution/executor/mod.rs | 6 ++ src/optimizer/rule/column_pruning.rs | 3 +- .../operator/alter_table/add_column.rs | 8 +++ src/planner/operator/alter_table/mod.rs | 1 + src/planner/operator/mod.rs | 7 +- src/storage/kip.rs | 53 ++++++++++++++ src/storage/mod.rs | 12 +++- tests/slt/alter_table.slt | 19 +++++ 15 files changed, 242 insertions(+), 6 deletions(-) create mode 100644 src/binder/alter_table.rs create mode 100644 src/execution/executor/ddl/alter_table/add_column.rs create mode 100644 src/execution/executor/ddl/alter_table/mod.rs create mode 100644 src/planner/operator/alter_table/add_column.rs create mode 100644 src/planner/operator/alter_table/mod.rs create mode 100644 tests/slt/alter_table.slt diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs new file mode 100644 index 00000000..139e0cdb --- /dev/null +++ b/src/binder/alter_table.rs @@ -0,0 +1,71 @@ +use sqlparser::ast::{AlterTableOperation, ObjectName}; + +use std::sync::Arc; + +use super::Binder; +use crate::binder::{lower_case_name, split_name, BindError}; +use crate::planner::operator::alter_table::add_column::AddColumnOperator; +use crate::planner::operator::scan::ScanOperator; +use crate::planner::operator::Operator; +use crate::planner::LogicalPlan; +use crate::storage::Transaction; + +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_alter_table( + &mut self, + name: &ObjectName, + operation: &AlterTableOperation, + ) -> Result { + let table_name: Arc = Arc::new(split_name(&lower_case_name(name))?.1.to_string()); + + let plan = match operation { + AlterTableOperation::AddColumn { + column_keyword: _, + if_not_exists, + column_def, + } => { + if let Some(table) = self.context.table(table_name.clone()) { + let plan = ScanOperator::build(table_name.clone(), table); + + LogicalPlan { + operator: Operator::AddColumn(AddColumnOperator { + table_name, + if_not_exists: *if_not_exists, + column: self.bind_column(column_def)?, + }), + childrens: vec![plan], + } + } else { + return Err(BindError::InvalidTable(format!( + "not found table {}", + table_name + ))); + } + } + AlterTableOperation::DropColumn { + column_name: _, + if_exists: _, + cascade: _, + } => todo!(), + AlterTableOperation::DropPrimaryKey => todo!(), + AlterTableOperation::RenameColumn { + old_column_name: _, + new_column_name: _, + } => todo!(), + AlterTableOperation::RenameTable { table_name: _ } => todo!(), + AlterTableOperation::ChangeColumn { + old_name: _, + new_name: _, + data_type: _, + options: _, + } => todo!(), + AlterTableOperation::AlterColumn { + column_name: _, + op: _, + } => todo!(), + _ => todo!(), + }; + + Ok(plan) + } +} diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index ae977423..abe20525 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -82,7 +82,7 @@ impl<'a, T: Transaction> Binder<'a, T> { Ok(plan) } - fn bind_column(&mut self, column_def: &ColumnDef) -> Result { + pub fn bind_column(&mut self, column_def: &ColumnDef) -> Result { let column_name = column_def.name.to_string(); let mut column_desc = ColumnDesc::new( LogicalType::try_from(column_def.data_type.clone())?, diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 81a3bd73..6cc35555 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -42,7 +42,6 @@ impl<'a, T: Transaction> Binder<'a, T> { } } let mut rows = Vec::with_capacity(expr_rows.len()); - for expr_row in expr_rows { let mut row = Vec::with_capacity(expr_row.len()); diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 816c895c..5a7780fe 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -1,4 +1,5 @@ pub mod aggregate; +mod alter_table; pub mod copy; mod create_table; mod delete; @@ -119,6 +120,7 @@ impl<'a, T: Transaction> Binder<'a, T> { pub fn bind(mut self, stmt: &Statement) -> Result { let plan = match stmt { Statement::Query(query) => self.bind_query(query)?, + Statement::AlterTable { name, operation } => self.bind_alter_table(name, operation)?, Statement::CreateTable { name, columns, diff --git a/src/execution/executor/ddl/alter_table/add_column.rs b/src/execution/executor/ddl/alter_table/add_column.rs new file mode 100644 index 00000000..b475efad --- /dev/null +++ b/src/execution/executor/ddl/alter_table/add_column.rs @@ -0,0 +1,61 @@ +use crate::execution::executor::BoxedExecutor; +use crate::types::tuple::Tuple; +use crate::types::value::DataValue; +use crate::{execution::ExecutorError, types::tuple_builder::TupleBuilder}; +use futures_async_stream::try_stream; +use std::cell::RefCell; +use std::sync::Arc; + +use crate::{ + execution::executor::Executor, planner::operator::alter_table::add_column::AddColumnOperator, + storage::Transaction, +}; + +pub struct AddColumn { + op: AddColumnOperator, + input: BoxedExecutor, +} + +impl From<(AddColumnOperator, BoxedExecutor)> for AddColumn { + fn from((op, input): (AddColumnOperator, BoxedExecutor)) -> Self { + Self { op, input } + } +} + +impl Executor for AddColumn { + fn execute(self, transaction: &RefCell) -> crate::execution::executor::BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } + } +} + +impl AddColumn { + #[try_stream(boxed, ok = Tuple, error = ExecutorError)] + async fn _execute(self, transaction: &mut T) { + let _ = transaction.add_column(&self.op)?; + + let AddColumnOperator { + table_name, column, .. + } = &self.op; + + #[for_await] + for tuple in self.input { + let mut tuple: Tuple = tuple?; + let is_overwrite = true; + + tuple.columns.push(Arc::new(column.clone())); + if let Some(value) = column.default_value() { + tuple.values.push(value); + } else { + tuple.values.push(Arc::new(DataValue::Null)); + } + + transaction.append(table_name, tuple, is_overwrite)?; + } + transaction.remove_cache(&table_name)?; + + let tuple_builder = TupleBuilder::new_result(); + let tuple = tuple_builder.push_result("ALTER TABLE SUCCESS", "1")?; + + yield tuple; + } +} diff --git a/src/execution/executor/ddl/alter_table/mod.rs b/src/execution/executor/ddl/alter_table/mod.rs new file mode 100644 index 00000000..4c008951 --- /dev/null +++ b/src/execution/executor/ddl/alter_table/mod.rs @@ -0,0 +1 @@ +pub mod add_column; diff --git a/src/execution/executor/ddl/mod.rs b/src/execution/executor/ddl/mod.rs index 9c5a45a1..4ec4ceef 100644 --- a/src/execution/executor/ddl/mod.rs +++ b/src/execution/executor/ddl/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod alter_table; pub(crate) mod create_table; pub(crate) mod drop_table; pub(crate) mod truncate; diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs index fdc22b65..38334a8d 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/executor/mod.rs @@ -31,6 +31,8 @@ use futures::stream::BoxStream; use futures::TryStreamExt; use std::cell::RefCell; +use self::ddl::alter_table::add_column::AddColumn; + pub type BoxedExecutor = BoxStream<'static, Result>; pub trait Executor { @@ -104,6 +106,10 @@ pub fn build(plan: LogicalPlan, transaction: &RefCell) -> Box Delete::from((op, input)).execute(transaction) } Operator::Values(op) => Values::from(op).execute(transaction), + Operator::AddColumn(op) => { + let input = build(childrens.remove(0), transaction); + AddColumn::from((op, input)).execute(transaction) + } Operator::CreateTable(op) => CreateTable::from(op).execute(transaction), Operator::DropTable(op) => DropTable::from(op).execute(transaction), Operator::Truncate(op) => Truncate::from(op).execute(transaction), diff --git a/src/optimizer/rule/column_pruning.rs b/src/optimizer/rule/column_pruning.rs index 86984530..ce16c9bf 100644 --- a/src/optimizer/rule/column_pruning.rs +++ b/src/optimizer/rule/column_pruning.rs @@ -115,7 +115,8 @@ impl ColumnPruning { | Operator::Truncate(_) | Operator::Show(_) | Operator::CopyFromFile(_) - | Operator::CopyToFile(_) => (), + | Operator::CopyToFile(_) + | Operator::AddColumn(_) => (), } } diff --git a/src/planner/operator/alter_table/add_column.rs b/src/planner/operator/alter_table/add_column.rs new file mode 100644 index 00000000..8862def4 --- /dev/null +++ b/src/planner/operator/alter_table/add_column.rs @@ -0,0 +1,8 @@ +use crate::catalog::{ColumnCatalog, TableName}; + +#[derive(Debug, PartialEq, Clone)] +pub struct AddColumnOperator { + pub table_name: TableName, + pub if_not_exists: bool, + pub column: ColumnCatalog, +} diff --git a/src/planner/operator/alter_table/mod.rs b/src/planner/operator/alter_table/mod.rs new file mode 100644 index 00000000..4c008951 --- /dev/null +++ b/src/planner/operator/alter_table/mod.rs @@ -0,0 +1 @@ +pub mod add_column; diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index e1d11aab..634957e4 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -1,4 +1,5 @@ pub mod aggregate; +pub mod alter_table; pub mod copy_from_file; pub mod copy_to_file; pub mod create_table; @@ -31,8 +32,9 @@ use crate::planner::operator::values::ValuesOperator; use itertools::Itertools; use self::{ - aggregate::AggregateOperator, filter::FilterOperator, join::JoinOperator, limit::LimitOperator, - project::ProjectOperator, scan::ScanOperator, sort::SortOperator, + aggregate::AggregateOperator, alter_table::add_column::AddColumnOperator, + filter::FilterOperator, join::JoinOperator, limit::LimitOperator, project::ProjectOperator, + scan::ScanOperator, sort::SortOperator, }; #[derive(Debug, PartialEq, Clone)] @@ -52,6 +54,7 @@ pub enum Operator { Update(UpdateOperator), Delete(DeleteOperator), // DDL + AddColumn(AddColumnOperator), CreateTable(CreateTableOperator), DropTable(DropTableOperator), Truncate(TruncateOperator), diff --git a/src/storage/kip.rs b/src/storage/kip.rs index d703d621..a195f45e 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -1,5 +1,6 @@ use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName}; use crate::expression::simplify::ConstantBinary; +use crate::planner::operator::alter_table::add_column::AddColumnOperator; use crate::storage::table_codec::TableCodec; use crate::storage::{ tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction, @@ -161,6 +162,53 @@ impl Transaction for KipTransaction { Ok(()) } + fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError> { + let AddColumnOperator { + table_name, + if_not_exists, + column, + } = op; + + if let Some(mut catalog) = self.table(table_name.clone()).cloned() { + if !column.nullable && column.default_value().is_none() { + return Err(StorageError::NeedNullAble); + } + + for col in catalog.all_columns() { + if col.name() == column.name() { + if *if_not_exists { + return Ok(()); + } else { + return Err(StorageError::DuplicateColumn); + } + } + } + + let col_id = catalog.add_column(column.clone())?; + + if column.desc.is_unique { + let meta = IndexMeta { + id: 0, + column_ids: vec![col_id], + name: format!("uk_{}", column.name()), + is_unique: true, + is_primary: false, + }; + let meta_ref = catalog.add_index_meta(meta); + let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?; + self.tx.set(key, value); + } + + let column = catalog.get_column_by_id(&col_id).unwrap(); + let (key, value) = TableCodec::encode_column(&table_name, column)?; + self.tx.set(key, value); + + Ok(()) + } else { + return Err(StorageError::TableNotFound); + } + } + fn create_table( &mut self, table_name: TableName, @@ -266,6 +314,11 @@ impl Transaction for KipTransaction { Ok(()) } + + fn remove_cache(&self, key: &String) -> Result<(), StorageError> { + self.cache.remove(key); + Ok(()) + } } impl KipTransaction { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b86fd4b2..dffc2c5f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,6 +4,7 @@ mod table_codec; use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog, TableName}; use crate::expression::simplify::ConstantBinary; use crate::expression::ScalarExpression; +use crate::planner::operator::alter_table::add_column::AddColumnOperator; use crate::storage::table_codec::TableCodec; use crate::types::errors::TypeError; use crate::types::index::{Index, IndexMetaRef}; @@ -67,7 +68,7 @@ pub trait Transaction: Sync + Send + 'static { ) -> Result<(), StorageError>; fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), StorageError>; - + fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError>; fn create_table( &mut self, table_name: TableName, @@ -83,6 +84,9 @@ pub trait Transaction: Sync + Send + 'static { #[allow(async_fn_in_trait)] async fn commit(self) -> Result<(), StorageError>; + fn remove_cache(&self, _key: &String) -> Result<(), StorageError> { + Ok(()) + } } enum IndexValue { @@ -313,6 +317,12 @@ pub enum StorageError { #[error("The table not found")] TableNotFound, + #[error("The some column already exists")] + DuplicateColumn, + + #[error("Add column need nullable")] + NeedNullAble, + #[error("The table already exists")] TableExists, } diff --git a/tests/slt/alter_table.slt b/tests/slt/alter_table.slt new file mode 100644 index 00000000..ded8ff9c --- /dev/null +++ b/tests/slt/alter_table.slt @@ -0,0 +1,19 @@ +statement ok +create table alter_table(id int primary key, v1 int) + +statement ok +insert into alter_table values (1,1), (2,2), (3,3), (4,4) + +statement ok +alter table alter_table add column da int null + +query IIII rowsort +select * from alter_table +---- +1 1 null +2 2 null +3 3 null +4 4 null + +statement ok +drop table alter_table