Skip to content

Commit

Permalink
Alter table (#104)
Browse files Browse the repository at this point in the history
* alter table

* refactor(column): remove field: `table_name`

* feat(create_table): add `default` on `CreateTable`

* feat(cast): support `CAST` Function

* feat: Constraints for CreateTable and Fix `Or` BinaryOperator in scope_aggregation cause index scan error

* version up

* feat(create_table): support `if not exists`

* version up

* code fmt

* support add column

* fix conflict

* split operator

* optimize

* optimize

* fix tuple decode err

* fmt

---------

Co-authored-by: Kould <[email protected]>
  • Loading branch information
guojidan and KKould authored Dec 5, 2023
1 parent bc211d7 commit eb9ab5a
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 6 deletions.
71 changes: 71 additions & 0 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use sqlparser::ast::{AlterTableOperation, ObjectName};

use std::sync::Arc;

use super::Binder;
use crate::binder::{lower_case_name, split_name, BindError};
use crate::planner::operator::alter_table::add_column::AddColumnOperator;
use crate::planner::operator::scan::ScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_alter_table(
&mut self,
name: &ObjectName,
operation: &AlterTableOperation,
) -> Result<LogicalPlan, BindError> {
let table_name: Arc<String> = Arc::new(split_name(&lower_case_name(name))?.1.to_string());

let plan = match operation {
AlterTableOperation::AddColumn {
column_keyword: _,
if_not_exists,
column_def,
} => {
if let Some(table) = self.context.table(table_name.clone()) {
let plan = ScanOperator::build(table_name.clone(), table);

LogicalPlan {
operator: Operator::AddColumn(AddColumnOperator {
table_name,
if_not_exists: *if_not_exists,
column: self.bind_column(column_def)?,
}),
childrens: vec![plan],
}
} else {
return Err(BindError::InvalidTable(format!(
"not found table {}",
table_name
)));
}
}
AlterTableOperation::DropColumn {
column_name: _,
if_exists: _,
cascade: _,
} => todo!(),
AlterTableOperation::DropPrimaryKey => todo!(),
AlterTableOperation::RenameColumn {
old_column_name: _,
new_column_name: _,
} => todo!(),
AlterTableOperation::RenameTable { table_name: _ } => todo!(),
AlterTableOperation::ChangeColumn {
old_name: _,
new_name: _,
data_type: _,
options: _,
} => todo!(),
AlterTableOperation::AlterColumn {
column_name: _,
op: _,
} => todo!(),
_ => todo!(),
};

Ok(plan)
}
}
2 changes: 1 addition & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(plan)
}

fn bind_column(&mut self, column_def: &ColumnDef) -> Result<ColumnCatalog, BindError> {
pub fn bind_column(&mut self, column_def: &ColumnDef) -> Result<ColumnCatalog, BindError> {
let column_name = column_def.name.to_string();
let mut column_desc = ColumnDesc::new(
LogicalType::try_from(column_def.data_type.clone())?,
Expand Down
1 change: 0 additions & 1 deletion src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl<'a, T: Transaction> Binder<'a, T> {
}
}
let mut rows = Vec::with_capacity(expr_rows.len());

for expr_row in expr_rows {
let mut row = Vec::with_capacity(expr_row.len());

Expand Down
2 changes: 2 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod aggregate;
mod alter_table;
pub mod copy;
mod create_table;
mod delete;
Expand Down Expand Up @@ -119,6 +120,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
pub fn bind(mut self, stmt: &Statement) -> Result<LogicalPlan, BindError> {
let plan = match stmt {
Statement::Query(query) => self.bind_query(query)?,
Statement::AlterTable { name, operation } => self.bind_alter_table(name, operation)?,
Statement::CreateTable {
name,
columns,
Expand Down
61 changes: 61 additions & 0 deletions src/execution/executor/ddl/alter_table/add_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::execution::executor::BoxedExecutor;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use crate::{execution::ExecutorError, types::tuple_builder::TupleBuilder};
use futures_async_stream::try_stream;
use std::cell::RefCell;
use std::sync::Arc;

use crate::{
execution::executor::Executor, planner::operator::alter_table::add_column::AddColumnOperator,
storage::Transaction,
};

pub struct AddColumn {
op: AddColumnOperator,
input: BoxedExecutor,
}

impl From<(AddColumnOperator, BoxedExecutor)> for AddColumn {
fn from((op, input): (AddColumnOperator, BoxedExecutor)) -> Self {
Self { op, input }
}
}

impl<T: Transaction> Executor<T> for AddColumn {
fn execute(self, transaction: &RefCell<T>) -> crate::execution::executor::BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}

impl AddColumn {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
async fn _execute<T: Transaction>(self, transaction: &mut T) {
let _ = transaction.add_column(&self.op)?;

let AddColumnOperator {
table_name, column, ..
} = &self.op;

#[for_await]
for tuple in self.input {
let mut tuple: Tuple = tuple?;
let is_overwrite = true;

tuple.columns.push(Arc::new(column.clone()));
if let Some(value) = column.default_value() {
tuple.values.push(value);
} else {
tuple.values.push(Arc::new(DataValue::Null));
}

transaction.append(table_name, tuple, is_overwrite)?;
}
transaction.remove_cache(&table_name)?;

let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder.push_result("ALTER TABLE SUCCESS", "1")?;

yield tuple;
}
}
1 change: 1 addition & 0 deletions src/execution/executor/ddl/alter_table/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod add_column;
1 change: 1 addition & 0 deletions src/execution/executor/ddl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod alter_table;
pub(crate) mod create_table;
pub(crate) mod drop_table;
pub(crate) mod truncate;
6 changes: 6 additions & 0 deletions src/execution/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use futures::stream::BoxStream;
use futures::TryStreamExt;
use std::cell::RefCell;

use self::ddl::alter_table::add_column::AddColumn;

pub type BoxedExecutor = BoxStream<'static, Result<Tuple, ExecutorError>>;

pub trait Executor<T: Transaction> {
Expand Down Expand Up @@ -104,6 +106,10 @@ pub fn build<T: Transaction>(plan: LogicalPlan, transaction: &RefCell<T>) -> Box
Delete::from((op, input)).execute(transaction)
}
Operator::Values(op) => Values::from(op).execute(transaction),
Operator::AddColumn(op) => {
let input = build(childrens.remove(0), transaction);
AddColumn::from((op, input)).execute(transaction)
}
Operator::CreateTable(op) => CreateTable::from(op).execute(transaction),
Operator::DropTable(op) => DropTable::from(op).execute(transaction),
Operator::Truncate(op) => Truncate::from(op).execute(transaction),
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/rule/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ impl ColumnPruning {
| Operator::Truncate(_)
| Operator::Show(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_) => (),
| Operator::CopyToFile(_)
| Operator::AddColumn(_) => (),
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/planner/operator/alter_table/add_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::catalog::{ColumnCatalog, TableName};

#[derive(Debug, PartialEq, Clone)]
pub struct AddColumnOperator {
pub table_name: TableName,
pub if_not_exists: bool,
pub column: ColumnCatalog,
}
1 change: 1 addition & 0 deletions src/planner/operator/alter_table/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod add_column;
7 changes: 5 additions & 2 deletions src/planner/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod aggregate;
pub mod alter_table;
pub mod copy_from_file;
pub mod copy_to_file;
pub mod create_table;
Expand Down Expand Up @@ -31,8 +32,9 @@ use crate::planner::operator::values::ValuesOperator;
use itertools::Itertools;

use self::{
aggregate::AggregateOperator, filter::FilterOperator, join::JoinOperator, limit::LimitOperator,
project::ProjectOperator, scan::ScanOperator, sort::SortOperator,
aggregate::AggregateOperator, alter_table::add_column::AddColumnOperator,
filter::FilterOperator, join::JoinOperator, limit::LimitOperator, project::ProjectOperator,
scan::ScanOperator, sort::SortOperator,
};

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -52,6 +54,7 @@ pub enum Operator {
Update(UpdateOperator),
Delete(DeleteOperator),
// DDL
AddColumn(AddColumnOperator),
CreateTable(CreateTableOperator),
DropTable(DropTableOperator),
Truncate(TruncateOperator),
Expand Down
53 changes: 53 additions & 0 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName};
use crate::expression::simplify::ConstantBinary;
use crate::planner::operator::alter_table::add_column::AddColumnOperator;
use crate::storage::table_codec::TableCodec;
use crate::storage::{
tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction,
Expand Down Expand Up @@ -161,6 +162,53 @@ impl Transaction for KipTransaction {
Ok(())
}

fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError> {
let AddColumnOperator {
table_name,
if_not_exists,
column,
} = op;

if let Some(mut catalog) = self.table(table_name.clone()).cloned() {
if !column.nullable && column.default_value().is_none() {
return Err(StorageError::NeedNullAble);
}

for col in catalog.all_columns() {
if col.name() == column.name() {
if *if_not_exists {
return Ok(());
} else {
return Err(StorageError::DuplicateColumn);
}
}
}

let col_id = catalog.add_column(column.clone())?;

if column.desc.is_unique {
let meta = IndexMeta {
id: 0,
column_ids: vec![col_id],
name: format!("uk_{}", column.name()),
is_unique: true,
is_primary: false,
};
let meta_ref = catalog.add_index_meta(meta);
let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?;
self.tx.set(key, value);
}

let column = catalog.get_column_by_id(&col_id).unwrap();
let (key, value) = TableCodec::encode_column(&table_name, column)?;
self.tx.set(key, value);

Ok(())
} else {
return Err(StorageError::TableNotFound);
}
}

fn create_table(
&mut self,
table_name: TableName,
Expand Down Expand Up @@ -266,6 +314,11 @@ impl Transaction for KipTransaction {

Ok(())
}

fn remove_cache(&self, key: &String) -> Result<(), StorageError> {
self.cache.remove(key);
Ok(())
}
}

impl KipTransaction {
Expand Down
12 changes: 11 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod table_codec;
use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog, TableName};
use crate::expression::simplify::ConstantBinary;
use crate::expression::ScalarExpression;
use crate::planner::operator::alter_table::add_column::AddColumnOperator;
use crate::storage::table_codec::TableCodec;
use crate::types::errors::TypeError;
use crate::types::index::{Index, IndexMetaRef};
Expand Down Expand Up @@ -67,7 +68,7 @@ pub trait Transaction: Sync + Send + 'static {
) -> Result<(), StorageError>;

fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), StorageError>;

fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError>;
fn create_table(
&mut self,
table_name: TableName,
Expand All @@ -83,6 +84,9 @@ pub trait Transaction: Sync + Send + 'static {

#[allow(async_fn_in_trait)]
async fn commit(self) -> Result<(), StorageError>;
fn remove_cache(&self, _key: &String) -> Result<(), StorageError> {
Ok(())
}
}

enum IndexValue {
Expand Down Expand Up @@ -313,6 +317,12 @@ pub enum StorageError {
#[error("The table not found")]
TableNotFound,

#[error("The some column already exists")]
DuplicateColumn,

#[error("Add column need nullable")]
NeedNullAble,

#[error("The table already exists")]
TableExists,
}
Expand Down
19 changes: 19 additions & 0 deletions tests/slt/alter_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
statement ok
create table alter_table(id int primary key, v1 int)

statement ok
insert into alter_table values (1,1), (2,2), (3,3), (4,4)

statement ok
alter table alter_table add column da int null

query IIII rowsort
select * from alter_table
----
1 1 null
2 2 null
3 3 null
4 4 null

statement ok
drop table alter_table

0 comments on commit eb9ab5a

Please sign in to comment.