diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs index 882695d7..33ad3035 100644 --- a/src/binder/alter_table.rs +++ b/src/binder/alter_table.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use super::Binder; use crate::binder::{lower_case_name, split_name, BindError}; -use crate::planner::operator::alter_table::{AddColumn, AlterTableOperator}; +use crate::planner::operator::alter_table::AddColumnOperator; use crate::planner::operator::scan::ScanOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; @@ -18,8 +18,6 @@ impl<'a, T: Transaction> Binder<'a, T> { ) -> Result { let table_name: Arc = Arc::new(split_name(&lower_case_name(name))?.1.to_string()); - // we need convert ColumnDef to ColumnCatalog - let plan = match operation { AlterTableOperation::AddColumn { column_keyword: _, @@ -30,11 +28,11 @@ impl<'a, T: Transaction> Binder<'a, T> { let plan = ScanOperator::build(table_name.clone(), table); LogicalPlan { - operator: Operator::AlterTable(AlterTableOperator::AddColumn(AddColumn { + operator: Operator::AddColumn(AddColumnOperator { table_name, if_not_exists: *if_not_exists, column: self.bind_column(column_def)?, - })), + }), childrens: vec![plan], } } else { diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 113aa76b..6cc35555 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -42,7 +42,6 @@ impl<'a, T: Transaction> Binder<'a, T> { } } let mut rows = Vec::with_capacity(expr_rows.len()); - println!("{:?}", expr_rows); for expr_row in expr_rows { let mut row = Vec::with_capacity(expr_row.len()); diff --git a/src/execution/executor/ddl/alter_table.rs b/src/execution/executor/ddl/alter_table.rs index 015f701a..51010f37 100644 --- a/src/execution/executor/ddl/alter_table.rs +++ b/src/execution/executor/ddl/alter_table.rs @@ -1,58 +1,55 @@ use crate::execution::executor::BoxedExecutor; -use crate::planner::operator::alter_table::AddColumn; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::{execution::ExecutorError, types::tuple_builder::TupleBuilder}; use futures_async_stream::try_stream; use std::cell::RefCell; -use std::ops::Deref; use std::sync::Arc; use crate::{ - execution::executor::Executor, planner::operator::alter_table::AlterTableOperator, + execution::executor::Executor, planner::operator::alter_table::AddColumnOperator, storage::Transaction, }; -pub struct AlterTable { - op: AlterTableOperator, +pub struct AddColumn { + op: AddColumnOperator, input: BoxedExecutor, } -impl From<(AlterTableOperator, BoxedExecutor)> for AlterTable { - fn from((op, input): (AlterTableOperator, BoxedExecutor)) -> Self { +impl From<(AddColumnOperator, BoxedExecutor)> for AddColumn { + fn from((op, input): (AddColumnOperator, BoxedExecutor)) -> Self { Self { op, input } } } -impl Executor for AlterTable { +impl Executor for AddColumn { fn execute(self, transaction: &RefCell) -> crate::execution::executor::BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } -impl AlterTable { +impl AddColumn { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] async fn _execute(self, transaction: &mut T) { - let _ = transaction.alter_table(&self.op)?; + let _ = transaction.add_column(&self.op)?; - if let AlterTableOperator::AddColumn(AddColumn { + let AddColumnOperator { table_name, column, .. - }) = &self.op - { - #[for_await] - for tuple in self.input { - let mut tuple: Tuple = tuple?; - let is_overwrite = true; - - tuple.columns.push(Arc::new(column.clone())); - if let Some(value) = column.default_value() { - tuple.values.push(Arc::new(value.deref().clone())); - } else { - tuple.values.push(Arc::new(DataValue::Null)); - } - - transaction.append(table_name, tuple, is_overwrite)?; + } = &self.op; + + #[for_await] + for tuple in self.input { + let mut tuple: Tuple = tuple?; + let is_overwrite = true; + + tuple.columns.push(Arc::new(column.clone())); + if let Some(value) = column.default_value() { + tuple.values.push(value); + } else { + tuple.values.push(Arc::new(DataValue::Null)); } + + transaction.append(table_name, tuple, is_overwrite)?; } let tuple_builder = TupleBuilder::new_result(); diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs index 9877e91c..caa605b9 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/executor/mod.rs @@ -31,7 +31,7 @@ use futures::stream::BoxStream; use futures::TryStreamExt; use std::cell::RefCell; -use self::ddl::alter_table::AlterTable; +use self::ddl::alter_table::AddColumn; pub type BoxedExecutor = BoxStream<'static, Result>; @@ -106,9 +106,9 @@ pub fn build(plan: LogicalPlan, transaction: &RefCell) -> Box Delete::from((op, input)).execute(transaction) } Operator::Values(op) => Values::from(op).execute(transaction), - Operator::AlterTable(op) => { + Operator::AddColumn(op) => { let input = build(childrens.remove(0), transaction); - AlterTable::from((op, input)).execute(transaction) + AddColumn::from((op, input)).execute(transaction) } Operator::CreateTable(op) => CreateTable::from(op).execute(transaction), Operator::DropTable(op) => DropTable::from(op).execute(transaction), diff --git a/src/optimizer/rule/column_pruning.rs b/src/optimizer/rule/column_pruning.rs index c27516b4..ce16c9bf 100644 --- a/src/optimizer/rule/column_pruning.rs +++ b/src/optimizer/rule/column_pruning.rs @@ -116,7 +116,7 @@ impl ColumnPruning { | Operator::Show(_) | Operator::CopyFromFile(_) | Operator::CopyToFile(_) - | Operator::AlterTable(_) => (), + | Operator::AddColumn(_) => (), } } diff --git a/src/planner/operator/alter_table.rs b/src/planner/operator/alter_table.rs index 143d3072..8862def4 100644 --- a/src/planner/operator/alter_table.rs +++ b/src/planner/operator/alter_table.rs @@ -1,18 +1,7 @@ use crate::catalog::{ColumnCatalog, TableName}; #[derive(Debug, PartialEq, Clone)] -pub enum AlterTableOperator { - AddColumn(AddColumn), - DropColumn, - DropPrimaryKey, - RenameColumn, - RenameTable, - ChangeColumn, - AlterColumn, -} - -#[derive(Debug, PartialEq, Clone)] -pub struct AddColumn { +pub struct AddColumnOperator { pub table_name: TableName, pub if_not_exists: bool, pub column: ColumnCatalog, diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 0747f184..487b46d1 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -32,7 +32,7 @@ use crate::planner::operator::values::ValuesOperator; use itertools::Itertools; use self::{ - aggregate::AggregateOperator, alter_table::AlterTableOperator, filter::FilterOperator, + aggregate::AggregateOperator, alter_table::AddColumnOperator, filter::FilterOperator, join::JoinOperator, limit::LimitOperator, project::ProjectOperator, scan::ScanOperator, sort::SortOperator, }; @@ -54,7 +54,7 @@ pub enum Operator { Update(UpdateOperator), Delete(DeleteOperator), // DDL - AlterTable(AlterTableOperator), + AddColumn(AddColumnOperator), CreateTable(CreateTableOperator), DropTable(DropTableOperator), Truncate(TruncateOperator), diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 2be825c0..af5eb273 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -1,6 +1,6 @@ use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName}; use crate::expression::simplify::ConstantBinary; -use crate::planner::operator::alter_table::{AddColumn, AlterTableOperator}; +use crate::planner::operator::alter_table::AddColumnOperator; use crate::storage::table_codec::TableCodec; use crate::storage::{ tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction, @@ -162,61 +162,52 @@ impl Transaction for KipTransaction { Ok(()) } - fn alter_table(&mut self, op: &AlterTableOperator) -> Result<(), StorageError> { - match op { - AlterTableOperator::AddColumn(AddColumn { - table_name, - if_not_exists, - column, - }) => { - // we need catalog generate col_id && index_id - // generally catalog is immutable, so do not worry it changed when alter table going on - if let Some(mut catalog) = self.table(table_name.clone()).cloned() { - // not yet supported default value - if !column.nullable { - return Err(StorageError::NeedNullAble); - } - - for col in catalog.all_columns() { - if col.name() == column.name() { - if *if_not_exists { - return Ok(()); - } else { - return Err(StorageError::DuplicateColumn); - } - } - } + fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError> { + let AddColumnOperator { + table_name, + if_not_exists, + column, + } = op; + // we need catalog generate col_id && index_id + // generally catalog is immutable, so do not worry it changed when alter table going on + if let Some(mut catalog) = self.table(table_name.clone()).cloned() { + // not yet supported default value + if !column.nullable { + return Err(StorageError::NeedNullAble); + } - let col_id = catalog.add_column(column.clone())?; - - if column.desc.is_unique { - let meta = IndexMeta { - id: 0, - column_ids: vec![col_id], - name: format!("uk_{}", column.name()), - is_unique: true, - is_primary: false, - }; - let meta_ref = catalog.add_index_meta(meta); - let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?; - self.tx.set(key, value); + for col in catalog.all_columns() { + if col.name() == column.name() { + if *if_not_exists { + return Ok(()); + } else { + return Err(StorageError::DuplicateColumn); } + } + } - let column = catalog.get_column_by_id(&col_id).unwrap(); - let (key, value) = TableCodec::encode_column(&table_name, column)?; - self.tx.set(key, value); + let col_id = catalog.add_column(column.clone())?; - Ok(()) - } else { - return Err(StorageError::TableNotFound); - } + if column.desc.is_unique { + let meta = IndexMeta { + id: 0, + column_ids: vec![col_id], + name: format!("uk_{}", column.name()), + is_unique: true, + is_primary: false, + }; + let meta_ref = catalog.add_index_meta(meta); + let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?; + self.tx.set(key, value); } - AlterTableOperator::DropColumn => todo!(), - AlterTableOperator::DropPrimaryKey => todo!(), - AlterTableOperator::RenameColumn => todo!(), - AlterTableOperator::RenameTable => todo!(), - AlterTableOperator::ChangeColumn => todo!(), - AlterTableOperator::AlterColumn => todo!(), + + let column = catalog.get_column_by_id(&col_id).unwrap(); + let (key, value) = TableCodec::encode_column(&table_name, column)?; + self.tx.set(key, value); + + Ok(()) + } else { + return Err(StorageError::TableNotFound); } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 7022aeb6..e4fb1f69 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,7 +4,7 @@ mod table_codec; use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog, TableName}; use crate::expression::simplify::ConstantBinary; use crate::expression::ScalarExpression; -use crate::planner::operator::alter_table::AlterTableOperator; +use crate::planner::operator::alter_table::AddColumnOperator; use crate::storage::table_codec::TableCodec; use crate::types::errors::TypeError; use crate::types::index::{Index, IndexMetaRef}; @@ -68,7 +68,7 @@ pub trait Transaction: Sync + Send + 'static { ) -> Result<(), StorageError>; fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), StorageError>; - fn alter_table(&mut self, op: &AlterTableOperator) -> Result<(), StorageError>; + fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError>; fn create_table( &mut self, table_name: TableName,