Skip to content

Commit

Permalink
feat: support drop column
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 16, 2023
1 parent aeeba74 commit c9bf67f
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 61 deletions.
87 changes: 50 additions & 37 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use super::Binder;
use crate::binder::{lower_case_name, split_name, BindError};
use crate::planner::operator::alter_table::add_column::AddColumnOperator;
use crate::planner::operator::alter_table::drop_column::DropColumnOperator;
use crate::planner::operator::scan::ScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
Expand All @@ -18,13 +19,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, BindError> {
let table_name: Arc<String> = Arc::new(split_name(&lower_case_name(name))?.1.to_string());

let plan = match operation {
AlterTableOperation::AddColumn {
column_keyword: _,
if_not_exists,
column_def,
} => {
if let Some(table) = self.context.table(table_name.clone()) {
if let Some(table) = self.context.table(table_name.clone()) {
let plan = match operation {
AlterTableOperation::AddColumn {
column_keyword: _,
if_not_exists,
column_def,
} => {
let plan = ScanOperator::build(table_name.clone(), table);

LogicalPlan {
Expand All @@ -35,37 +36,49 @@ impl<'a, T: Transaction> Binder<'a, T> {
}),
childrens: vec![plan],
}
} else {
return Err(BindError::InvalidTable(format!(
"not found table {}",
table_name
)));
}
}
AlterTableOperation::DropColumn {
column_name: _,
if_exists: _,
cascade: _,
} => todo!(),
AlterTableOperation::DropPrimaryKey => todo!(),
AlterTableOperation::RenameColumn {
old_column_name: _,
new_column_name: _,
} => todo!(),
AlterTableOperation::RenameTable { table_name: _ } => todo!(),
AlterTableOperation::ChangeColumn {
old_name: _,
new_name: _,
data_type: _,
options: _,
} => todo!(),
AlterTableOperation::AlterColumn {
column_name: _,
op: _,
} => todo!(),
_ => todo!(),
};
AlterTableOperation::DropColumn {
column_name,
if_exists,
..
} => {
let plan = ScanOperator::build(table_name.clone(), table);
let column_name = column_name.value.clone();

LogicalPlan {
operator: Operator::DropColumn(DropColumnOperator {
table_name,
if_exists: *if_exists,
column_name,
}),
childrens: vec![plan],
}
}
AlterTableOperation::DropPrimaryKey => todo!(),
AlterTableOperation::RenameColumn {
old_column_name: _,
new_column_name: _,
} => todo!(),
AlterTableOperation::RenameTable { table_name: _ } => todo!(),
AlterTableOperation::ChangeColumn {
old_name: _,
new_name: _,
data_type: _,
options: _,
} => todo!(),
AlterTableOperation::AlterColumn {
column_name: _,
op: _,
} => todo!(),
_ => todo!(),
};

Ok(plan)
Ok(plan)
} else {
Err(BindError::InvalidTable(format!(
"not found table {}",
table_name
)))
}
}
}
20 changes: 16 additions & 4 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ impl TableCatalog {
}

#[allow(dead_code)]
pub(crate) fn get_column_id_by_name(&self, name: &String) -> Option<ColumnId> {
pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option<ColumnId> {
self.column_idxs.get(name).cloned()
}

pub(crate) fn get_column_by_name(&self, name: &String) -> Option<&ColumnRef> {
pub(crate) fn get_column_by_name(&self, name: &str) -> Option<&ColumnRef> {
let id = self.column_idxs.get(name)?;
self.columns.get(id)
}
Expand Down Expand Up @@ -65,10 +65,22 @@ impl TableCatalog {
Ok(col_id)
}

pub(crate) fn add_index_meta(&mut self, mut index: IndexMeta) -> &IndexMeta {
pub(crate) fn add_index_meta(
&mut self,
name: String,
column_ids: Vec<ColumnId>,
is_unique: bool,
is_primary: bool,
) -> &IndexMeta {
let index_id = self.indexes.len();

index.id = index_id as u32;
let index = IndexMeta {
id: index_id as u32,
column_ids,
name,
is_unique,
is_primary,
};
self.indexes.push(Arc::new(index));

&self.indexes[index_id]
Expand Down
2 changes: 1 addition & 1 deletion src/execution/executor/ddl/alter_table/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl From<(AddColumnOperator, BoxedExecutor)> for AddColumn {
}

impl<T: Transaction> Executor<T> for AddColumn {
fn execute(self, transaction: &RefCell<T>) -> crate::execution::executor::BoxedExecutor {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}
Expand Down
76 changes: 76 additions & 0 deletions src/execution/executor/ddl/alter_table/drop_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::binder::BindError;
use crate::execution::executor::{BoxedExecutor, Executor};
use crate::execution::ExecutorError;
use crate::planner::operator::alter_table::drop_column::DropColumnOperator;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use futures_async_stream::try_stream;
use std::cell::RefCell;

pub struct DropColumn {
op: DropColumnOperator,
input: BoxedExecutor,
}

impl From<(DropColumnOperator, BoxedExecutor)> for DropColumn {
fn from((op, input): (DropColumnOperator, BoxedExecutor)) -> Self {
Self { op, input }
}
}

impl<T: Transaction> Executor<T> for DropColumn {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}

impl DropColumn {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
async fn _execute<T: Transaction>(self, transaction: &mut T) {
let DropColumnOperator {
table_name,
column_name,
if_exists,
} = &self.op;
let mut option_column_index = None;

#[for_await]
for tuple in self.input {
let mut tuple: Tuple = tuple?;

if option_column_index.is_none() {
if let Some((column_index, is_primary)) = tuple
.columns
.iter()
.enumerate()
.find(|(_, column)| column.name() == column_name)
.map(|(i, column)| (i, column.desc.is_primary))
{
if is_primary {
Err(BindError::InvalidColumn(
"drop of primary key column is not allowed.".to_owned(),
))?;
}
option_column_index = Some(column_index);
}
}
if option_column_index.is_none() && *if_exists {
return Ok(());
}
let column_index = option_column_index
.ok_or_else(|| BindError::InvalidColumn("not found column".to_string()))?;

let _ = tuple.columns.remove(column_index);
let _ = tuple.values.remove(column_index);

transaction.append(table_name, tuple, true)?;
}
transaction.drop_column(table_name, column_name, *if_exists)?;

let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder.push_result("ALTER TABLE SUCCESS", "1")?;

yield tuple;
}
}
1 change: 1 addition & 0 deletions src/execution/executor/ddl/alter_table/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod add_column;
pub mod drop_column;
5 changes: 5 additions & 0 deletions src/execution/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub(crate) mod dml;
pub(crate) mod dql;
pub(crate) mod show;

use crate::execution::executor::ddl::alter_table::drop_column::DropColumn;
use crate::execution::executor::ddl::create_table::CreateTable;
use crate::execution::executor::ddl::drop_table::DropTable;
use crate::execution::executor::ddl::truncate::Truncate;
Expand Down Expand Up @@ -110,6 +111,10 @@ pub fn build<T: Transaction>(plan: LogicalPlan, transaction: &RefCell<T>) -> Box
let input = build(childrens.remove(0), transaction);
AddColumn::from((op, input)).execute(transaction)
}
Operator::DropColumn(op) => {
let input = build(childrens.remove(0), transaction);
DropColumn::from((op, input)).execute(transaction)
}
Operator::CreateTable(op) => CreateTable::from(op).execute(transaction),
Operator::DropTable(op) => DropTable::from(op).execute(transaction),
Operator::Truncate(op) => Truncate::from(op).execute(transaction),
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/rule/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ impl ColumnPruning {
| Operator::Show(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
| Operator::AddColumn(_) => (),
| Operator::AddColumn(_)
| Operator::DropColumn(_) => (),
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/planner/operator/alter_table/drop_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::catalog::TableName;

#[derive(Debug, PartialEq, Clone)]
pub struct DropColumnOperator {
pub table_name: TableName,
pub column_name: String,
pub if_exists: bool,
}
1 change: 1 addition & 0 deletions src/planner/operator/alter_table/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod add_column;
pub mod drop_column;
2 changes: 2 additions & 0 deletions src/planner/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod update;
pub mod values;

use crate::catalog::ColumnRef;
use crate::planner::operator::alter_table::drop_column::DropColumnOperator;
use crate::planner::operator::copy_from_file::CopyFromFileOperator;
use crate::planner::operator::copy_to_file::CopyToFileOperator;
use crate::planner::operator::create_table::CreateTableOperator;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub enum Operator {
Delete(DeleteOperator),
// DDL
AddColumn(AddColumnOperator),
DropColumn(DropColumnOperator),
CreateTable(CreateTableOperator),
DropTable(DropTableOperator),
Truncate(TruncateOperator),
Expand Down
64 changes: 48 additions & 16 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use crate::storage::table_codec::TableCodec;
use crate::storage::{
tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction,
};
use crate::types::index::{Index, IndexMeta, IndexMetaRef};
use crate::types::index::{Index, IndexMetaRef};
use crate::types::tuple::{Tuple, TupleId};
use crate::types::ColumnId;
use kip_db::kernel::lsm::iterator::Iter as KipDBIter;
use kip_db::kernel::lsm::mvcc::{CheckType, TransactionIter};
use kip_db::kernel::lsm::storage::Config;
use kip_db::kernel::lsm::{mvcc, storage};
use kip_db::kernel::utils::lru_cache::ShardingLruCache;
use kip_db::KernelError;
use std::collections::hash_map::RandomState;
use std::collections::{Bound, VecDeque};
use std::path::PathBuf;
Expand Down Expand Up @@ -186,14 +187,12 @@ impl Transaction for KipTransaction {
let col_id = catalog.add_column(column.clone())?;

if column.desc.is_unique {
let meta = IndexMeta {
id: 0,
column_ids: vec![col_id],
name: format!("uk_{}", column.name()),
is_unique: true,
is_primary: false,
};
let meta_ref = catalog.add_index_meta(meta);
let meta_ref = catalog.add_index_meta(
format!("uk_{}", column.name()),
vec![col_id],
true,
false,
);
let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?;
self.tx.set(key, value);
}
Expand All @@ -209,6 +208,41 @@ impl Transaction for KipTransaction {
}
}

fn drop_column(
&mut self,
table_name: &TableName,
column_name: &str,
if_exists: bool,
) -> Result<(), StorageError> {
if let Some(catalog) = self.table(table_name.clone()).cloned() {
let column = catalog.get_column_by_name(column_name).unwrap();

if let Some(index_meta) = catalog.get_unique_index(&column.id().unwrap()) {
let (index_meta_key, _) = TableCodec::encode_index_meta(table_name, index_meta)?;
self.tx.remove(&index_meta_key)?;

let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id);
Self::_drop_data(&mut self.tx, &index_min, &index_max)?;
}
let (key, _) = TableCodec::encode_column(&table_name, column)?;

match self.tx.remove(&key) {
Ok(_) => (),
Err(KernelError::KeyNotFound) => {
if !if_exists {
Err(KernelError::KeyNotFound)?;
}
}
err => err?,
}
self.cache.remove(table_name);

Ok(())
} else {
Err(StorageError::TableNotFound)
}
}

fn create_table(
&mut self,
table_name: TableName,
Expand Down Expand Up @@ -387,14 +421,12 @@ impl KipTransaction {
let prefix = if is_primary { "pk" } else { "uk" };

if let Some(col_id) = col.id() {
let meta = IndexMeta {
id: 0,
column_ids: vec![col_id],
name: format!("{}_{}", prefix, col.name()),
is_unique: col.desc.is_unique,
let meta_ref = table.add_index_meta(
format!("{}_{}", prefix, col.name()),
vec![col_id],
col.desc.is_unique,
is_primary,
};
let meta_ref = table.add_index_meta(meta);
);
let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?;

tx.set(key, value);
Expand Down
Loading

0 comments on commit c9bf67f

Please sign in to comment.