Skip to content

Commit

Permalink
split operator
Browse files Browse the repository at this point in the history
  • Loading branch information
guojidan committed Dec 4, 2023
1 parent 33c3f0f commit 0cd6841
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 103 deletions.
8 changes: 3 additions & 5 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use super::Binder;
use crate::binder::{lower_case_name, split_name, BindError};
use crate::planner::operator::alter_table::{AddColumn, AlterTableOperator};
use crate::planner::operator::alter_table::AddColumnOperator;
use crate::planner::operator::scan::ScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
Expand All @@ -18,8 +18,6 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<LogicalPlan, BindError> {
let table_name: Arc<String> = Arc::new(split_name(&lower_case_name(name))?.1.to_string());

// we need convert ColumnDef to ColumnCatalog

let plan = match operation {
AlterTableOperation::AddColumn {
column_keyword: _,
Expand All @@ -30,11 +28,11 @@ impl<'a, T: Transaction> Binder<'a, T> {
let plan = ScanOperator::build(table_name.clone(), table);

LogicalPlan {
operator: Operator::AlterTable(AlterTableOperator::AddColumn(AddColumn {
operator: Operator::AddColumn(AddColumnOperator {
table_name,
if_not_exists: *if_not_exists,
column: self.bind_column(column_def)?,
})),
}),
childrens: vec![plan],
}
} else {
Expand Down
1 change: 0 additions & 1 deletion src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl<'a, T: Transaction> Binder<'a, T> {
}
}
let mut rows = Vec::with_capacity(expr_rows.len());
println!("{:?}", expr_rows);
for expr_row in expr_rows {
let mut row = Vec::with_capacity(expr_row.len());

Expand Down
49 changes: 23 additions & 26 deletions src/execution/executor/ddl/alter_table.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,55 @@
use crate::execution::executor::BoxedExecutor;
use crate::planner::operator::alter_table::AddColumn;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use crate::{execution::ExecutorError, types::tuple_builder::TupleBuilder};
use futures_async_stream::try_stream;
use std::cell::RefCell;
use std::ops::Deref;
use std::sync::Arc;

use crate::{
execution::executor::Executor, planner::operator::alter_table::AlterTableOperator,
execution::executor::Executor, planner::operator::alter_table::AddColumnOperator,
storage::Transaction,
};

pub struct AlterTable {
op: AlterTableOperator,
pub struct AddColumn {
op: AddColumnOperator,
input: BoxedExecutor,
}

impl From<(AlterTableOperator, BoxedExecutor)> for AlterTable {
fn from((op, input): (AlterTableOperator, BoxedExecutor)) -> Self {
impl From<(AddColumnOperator, BoxedExecutor)> for AddColumn {
fn from((op, input): (AddColumnOperator, BoxedExecutor)) -> Self {
Self { op, input }
}
}

impl<T: Transaction> Executor<T> for AlterTable {
impl<T: Transaction> Executor<T> for AddColumn {
fn execute(self, transaction: &RefCell<T>) -> crate::execution::executor::BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}

impl AlterTable {
impl AddColumn {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
async fn _execute<T: Transaction>(self, transaction: &mut T) {
let _ = transaction.alter_table(&self.op)?;
let _ = transaction.add_column(&self.op)?;

if let AlterTableOperator::AddColumn(AddColumn {
let AddColumnOperator {
table_name, column, ..
}) = &self.op
{
#[for_await]
for tuple in self.input {
let mut tuple: Tuple = tuple?;
let is_overwrite = true;

tuple.columns.push(Arc::new(column.clone()));
if let Some(value) = column.default_value() {
tuple.values.push(Arc::new(value.deref().clone()));
} else {
tuple.values.push(Arc::new(DataValue::Null));
}

transaction.append(table_name, tuple, is_overwrite)?;
} = &self.op;

#[for_await]
for tuple in self.input {
let mut tuple: Tuple = tuple?;
let is_overwrite = true;

tuple.columns.push(Arc::new(column.clone()));
if let Some(value) = column.default_value() {
tuple.values.push(value);
} else {
tuple.values.push(Arc::new(DataValue::Null));
}

transaction.append(table_name, tuple, is_overwrite)?;
}

let tuple_builder = TupleBuilder::new_result();
Expand Down
6 changes: 3 additions & 3 deletions src/execution/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use futures::stream::BoxStream;
use futures::TryStreamExt;
use std::cell::RefCell;

use self::ddl::alter_table::AlterTable;
use self::ddl::alter_table::AddColumn;

pub type BoxedExecutor = BoxStream<'static, Result<Tuple, ExecutorError>>;

Expand Down Expand Up @@ -106,9 +106,9 @@ pub fn build<T: Transaction>(plan: LogicalPlan, transaction: &RefCell<T>) -> Box
Delete::from((op, input)).execute(transaction)
}
Operator::Values(op) => Values::from(op).execute(transaction),
Operator::AlterTable(op) => {
Operator::AddColumn(op) => {
let input = build(childrens.remove(0), transaction);
AlterTable::from((op, input)).execute(transaction)
AddColumn::from((op, input)).execute(transaction)
}
Operator::CreateTable(op) => CreateTable::from(op).execute(transaction),
Operator::DropTable(op) => DropTable::from(op).execute(transaction),
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/rule/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl ColumnPruning {
| Operator::Show(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
| Operator::AlterTable(_) => (),
| Operator::AddColumn(_) => (),
}
}

Expand Down
13 changes: 1 addition & 12 deletions src/planner/operator/alter_table.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
use crate::catalog::{ColumnCatalog, TableName};

#[derive(Debug, PartialEq, Clone)]
pub enum AlterTableOperator {
AddColumn(AddColumn),
DropColumn,
DropPrimaryKey,
RenameColumn,
RenameTable,
ChangeColumn,
AlterColumn,
}

#[derive(Debug, PartialEq, Clone)]
pub struct AddColumn {
pub struct AddColumnOperator {
pub table_name: TableName,
pub if_not_exists: bool,
pub column: ColumnCatalog,
Expand Down
4 changes: 2 additions & 2 deletions src/planner/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::planner::operator::values::ValuesOperator;
use itertools::Itertools;

use self::{
aggregate::AggregateOperator, alter_table::AlterTableOperator, filter::FilterOperator,
aggregate::AggregateOperator, alter_table::AddColumnOperator, filter::FilterOperator,
join::JoinOperator, limit::LimitOperator, project::ProjectOperator, scan::ScanOperator,
sort::SortOperator,
};
Expand All @@ -54,7 +54,7 @@ pub enum Operator {
Update(UpdateOperator),
Delete(DeleteOperator),
// DDL
AlterTable(AlterTableOperator),
AddColumn(AddColumnOperator),
CreateTable(CreateTableOperator),
DropTable(DropTableOperator),
Truncate(TruncateOperator),
Expand Down
93 changes: 42 additions & 51 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName};
use crate::expression::simplify::ConstantBinary;
use crate::planner::operator::alter_table::{AddColumn, AlterTableOperator};
use crate::planner::operator::alter_table::AddColumnOperator;
use crate::storage::table_codec::TableCodec;
use crate::storage::{
tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction,
Expand Down Expand Up @@ -162,61 +162,52 @@ impl Transaction for KipTransaction {
Ok(())
}

fn alter_table(&mut self, op: &AlterTableOperator) -> Result<(), StorageError> {
match op {
AlterTableOperator::AddColumn(AddColumn {
table_name,
if_not_exists,
column,
}) => {
// we need catalog generate col_id && index_id
// generally catalog is immutable, so do not worry it changed when alter table going on
if let Some(mut catalog) = self.table(table_name.clone()).cloned() {
// not yet supported default value
if !column.nullable {
return Err(StorageError::NeedNullAble);
}

for col in catalog.all_columns() {
if col.name() == column.name() {
if *if_not_exists {
return Ok(());
} else {
return Err(StorageError::DuplicateColumn);
}
}
}
fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError> {
let AddColumnOperator {
table_name,
if_not_exists,
column,
} = op;
// we need catalog generate col_id && index_id
// generally catalog is immutable, so do not worry it changed when alter table going on
if let Some(mut catalog) = self.table(table_name.clone()).cloned() {
// not yet supported default value
if !column.nullable {
return Err(StorageError::NeedNullAble);
}

let col_id = catalog.add_column(column.clone())?;

if column.desc.is_unique {
let meta = IndexMeta {
id: 0,
column_ids: vec![col_id],
name: format!("uk_{}", column.name()),
is_unique: true,
is_primary: false,
};
let meta_ref = catalog.add_index_meta(meta);
let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?;
self.tx.set(key, value);
for col in catalog.all_columns() {
if col.name() == column.name() {
if *if_not_exists {
return Ok(());
} else {
return Err(StorageError::DuplicateColumn);
}
}
}

let column = catalog.get_column_by_id(&col_id).unwrap();
let (key, value) = TableCodec::encode_column(&table_name, column)?;
self.tx.set(key, value);
let col_id = catalog.add_column(column.clone())?;

Ok(())
} else {
return Err(StorageError::TableNotFound);
}
if column.desc.is_unique {
let meta = IndexMeta {
id: 0,
column_ids: vec![col_id],
name: format!("uk_{}", column.name()),
is_unique: true,
is_primary: false,
};
let meta_ref = catalog.add_index_meta(meta);
let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?;
self.tx.set(key, value);
}
AlterTableOperator::DropColumn => todo!(),
AlterTableOperator::DropPrimaryKey => todo!(),
AlterTableOperator::RenameColumn => todo!(),
AlterTableOperator::RenameTable => todo!(),
AlterTableOperator::ChangeColumn => todo!(),
AlterTableOperator::AlterColumn => todo!(),

let column = catalog.get_column_by_id(&col_id).unwrap();
let (key, value) = TableCodec::encode_column(&table_name, column)?;
self.tx.set(key, value);

Ok(())
} else {
return Err(StorageError::TableNotFound);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod table_codec;
use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog, TableName};
use crate::expression::simplify::ConstantBinary;
use crate::expression::ScalarExpression;
use crate::planner::operator::alter_table::AlterTableOperator;
use crate::planner::operator::alter_table::AddColumnOperator;
use crate::storage::table_codec::TableCodec;
use crate::types::errors::TypeError;
use crate::types::index::{Index, IndexMetaRef};
Expand Down Expand Up @@ -68,7 +68,7 @@ pub trait Transaction: Sync + Send + 'static {
) -> Result<(), StorageError>;

fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), StorageError>;
fn alter_table(&mut self, op: &AlterTableOperator) -> Result<(), StorageError>;
fn add_column(&mut self, op: &AddColumnOperator) -> Result<(), StorageError>;
fn create_table(
&mut self,
table_name: TableName,
Expand Down

0 comments on commit 0cd6841

Please sign in to comment.