Skip to content

Commit

Permalink
feat: impl multiple primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 6, 2024
1 parent 4683978 commit fe88322
Show file tree
Hide file tree
Showing 39 changed files with 569 additions and 214 deletions.
14 changes: 7 additions & 7 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl<T: Transaction> Binder<'_, '_, T> {
.find(|column| column.name() == column_name)
{
if *is_primary {
column.desc_mut().is_primary = true;
column.desc_mut().set_primary(true);
} else {
column.desc_mut().is_unique = true;
column.desc_mut().set_unique(true);
}
}
}
Expand All @@ -73,9 +73,9 @@ impl<T: Transaction> Binder<'_, '_, T> {
}
}

if columns.iter().filter(|col| col.desc().is_primary).count() != 1 {
if columns.iter().filter(|col| col.desc().is_primary()).count() == 0 {
return Err(DatabaseError::InvalidTable(
"The primary key field must exist and have at least one".to_string(),
"the primary key field must exist and have at least one".to_string(),
));
}

Expand Down Expand Up @@ -106,12 +106,12 @@ impl<T: Transaction> Binder<'_, '_, T> {
ColumnOption::NotNull => nullable = false,
ColumnOption::Unique { is_primary, .. } => {
if *is_primary {
column_desc.is_primary = true;
column_desc.set_primary(true);
nullable = false;
// Skip other options when using primary key
break;
} else {
column_desc.is_unique = true;
column_desc.set_unique(true);
}
}
ColumnOption::Default(expr) => {
Expand All @@ -125,7 +125,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
if expr.return_type() != column_desc.column_datatype {
expr = ScalarExpression::TypeCast {
expr: Box::new(expr),
ty: column_desc.column_datatype,
ty: column_desc.column_datatype.clone(),
}
}
column_desc.default = Some(expr);
Expand Down
2 changes: 1 addition & 1 deletion src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default();
let primary_key_column = source
.columns(schema_buf)
.find(|column| column.desc().is_primary)
.find(|column| column.desc().is_primary())
.cloned()
.unwrap();
let mut plan = match source {
Expand Down
2 changes: 1 addition & 1 deletion src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if ty == &LogicalType::SqlNull {
*ty = result_ty;
} else if ty != &result_ty {
return Err(DatabaseError::Incomparable(*ty, result_ty));
return Err(DatabaseError::Incomparable(ty.clone(), result_ty));
}
}

Expand Down
20 changes: 18 additions & 2 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ impl ColumnCatalog {
#[derive(Debug, Clone, PartialEq, Eq, Hash, ReferenceSerialization)]
pub struct ColumnDesc {
pub(crate) column_datatype: LogicalType,
pub(crate) is_primary: bool,
pub(crate) is_unique: bool,
is_primary: bool,
is_unique: bool,
pub(crate) default: Option<ScalarExpression>,
}

Expand All @@ -212,4 +212,20 @@ impl ColumnDesc {
default,
})
}

pub(crate) fn is_primary(&self) -> bool {
self.is_primary
}

pub(crate) fn set_primary(&mut self, is_primary: bool) {
self.is_primary = is_primary
}

pub(crate) fn is_unique(&self) -> bool {
self.is_unique
}

pub(crate) fn set_unique(&mut self, is_unique: bool) {
self.is_unique = is_unique
}
}
38 changes: 30 additions & 8 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct TableCatalog {
pub(crate) indexes: Vec<IndexMetaRef>,

schema_ref: SchemaRef,
primary_keys: Vec<(usize, ColumnRef)>,
}

//TODO: can add some like Table description and other information as attributes
Expand Down Expand Up @@ -73,17 +74,13 @@ impl TableCatalog {
self.columns.len()
}

pub(crate) fn primary_key(&self) -> Result<(usize, &ColumnRef), DatabaseError> {
self.schema_ref
.iter()
.enumerate()
.find(|(_, column)| column.desc().is_primary)
.ok_or(DatabaseError::PrimaryKeyNotFound)
pub(crate) fn primary_keys(&self) -> &[(usize, ColumnRef)] {
&self.primary_keys
}

pub(crate) fn types(&self) -> Vec<LogicalType> {
self.columns()
.map(|column| *column.datatype())
.map(|column| column.datatype().clone())
.collect_vec()
}

Expand Down Expand Up @@ -128,7 +125,17 @@ impl TableCatalog {
}

let index_id = self.indexes.last().map(|index| index.id + 1).unwrap_or(0);
let pk_ty = *self.primary_key()?.1.datatype();
let primary_keys = self.primary_keys();
let pk_ty = if primary_keys.len() == 1 {
primary_keys[0].1.datatype().clone()
} else {
LogicalType::Tuple(
primary_keys
.iter()
.map(|(_, column)| column.datatype().clone())
.collect_vec(),
)
};
let index = IndexMeta {
id: index_id,
column_ids,
Expand All @@ -154,13 +161,21 @@ impl TableCatalog {
columns: BTreeMap::new(),
indexes: vec![],
schema_ref: Arc::new(vec![]),
primary_keys: vec![],
};
let mut generator = Generator::new();
for col_catalog in columns.into_iter() {
let _ = table_catalog
.add_column(col_catalog, &mut generator)
.unwrap();
}
table_catalog.primary_keys = table_catalog
.schema_ref
.iter()
.enumerate()
.filter(|&(_, column)| column.desc().is_primary())
.map(|(i, column)| (i, column.clone()))
.collect_vec();

Ok(table_catalog)
}
Expand All @@ -182,13 +197,20 @@ impl TableCatalog {
columns.insert(column_id, i);
}
let schema_ref = Arc::new(column_refs.clone());
let primary_keys = schema_ref
.iter()
.enumerate()
.filter(|&(_, column)| column.desc().is_primary())
.map(|(i, column)| (i, column.clone()))
.collect_vec();

Ok(TableCatalog {
name,
column_idxs,
columns,
indexes,
schema_ref,
primary_keys,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub enum DatabaseError {
),
#[error("must contain primary key!")]
PrimaryKeyNotFound,
#[error("primaryKey only allows single or multiple values")]
PrimaryKeyTooManyLayers,
#[error("rocksdb: {0}")]
RocksDB(
#[source]
Expand Down
6 changes: 3 additions & 3 deletions src/execution/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
if_not_exists,
} = &self.op;

let mut unique_values = column.desc().is_unique.then(Vec::new);
let mut unique_values = column.desc().is_unique().then(Vec::new);
let mut tuples = Vec::new();
let schema = self.input.output_schema();
let mut types = Vec::with_capacity(schema.len() + 1);

for column_ref in schema.iter() {
types.push(*column_ref.datatype());
types.push(column_ref.datatype().clone());
}
types.push(*column.datatype());
types.push(column.datatype().clone());

let mut coroutine = build_read(self.input, cache, transaction);

Expand Down
4 changes: 2 additions & 2 deletions src/execution/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
.iter()
.enumerate()
.find(|(_, column)| column.name() == column_name)
.map(|(i, column)| (i, column.desc().is_primary))
.map(|(i, column)| (i, column.desc().is_primary()))
{
if is_primary {
throw!(Err(DatabaseError::InvalidColumn(
Expand All @@ -55,7 +55,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
if i == column_index {
continue;
}
types.push(*column_ref.datatype());
types.push(column_ref.datatype().clone());
}
let mut coroutine = build_read(self.input, cache, transaction);

Expand Down
28 changes: 20 additions & 8 deletions src/execution/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use crate::types::value::DataValue;
use crate::types::ColumnId;
use itertools::Itertools;
use std::collections::HashMap;
use std::ops::Coroutine;
use std::ops::CoroutineState;
Expand Down Expand Up @@ -79,11 +80,14 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
let mut tuples = Vec::new();
let schema = input.output_schema().clone();

let pk_key = throw!(schema
let primary_keys = schema
.iter()
.find(|col| col.desc().is_primary)
.filter(|&col| col.desc().is_primary())
.map(|col| col.key(is_mapping_by_name))
.ok_or(DatabaseError::NotNull));
.collect_vec();
if primary_keys.is_empty() {
throw!(Err(DatabaseError::NotNull))
}

if let Some(table_catalog) =
throw!(transaction.table(cache.0, table_name.clone())).cloned()
Expand All @@ -94,14 +98,18 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let Tuple { values, .. } = throw!(tuple);

let mut tuple_id = Vec::with_capacity(primary_keys.len());
let mut tuple_map = HashMap::new();
for (i, value) in values.into_iter().enumerate() {
tuple_map.insert(schema[i].key(is_mapping_by_name), value);
}
let tuple_id = throw!(tuple_map
.get(&pk_key)
.cloned()
.ok_or(DatabaseError::NotNull));

for primary_key in primary_keys.iter() {
tuple_id.push(throw!(tuple_map
.get(primary_key)
.cloned()
.ok_or(DatabaseError::NotNull)));
}
let mut values = Vec::with_capacity(table_catalog.columns_len());

for col in table_catalog.columns() {
Expand All @@ -120,7 +128,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
values.push(value)
}
tuples.push(Tuple {
id: Some(tuple_id),
id: Some(if primary_keys.len() == 1 {
tuple_id.pop().unwrap()
} else {
Arc::new(DataValue::Tuple(Some(tuple_id)))
}),
values,
});
}
Expand Down
24 changes: 18 additions & 6 deletions src/execution/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use crate::types::index::Index;
use crate::types::tuple::types;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use crate::types::value::DataValue;
use std::collections::HashMap;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
use std::sync::Arc;

pub struct Update {
table_name: TableName,
Expand Down Expand Up @@ -93,18 +95,28 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
}
for mut tuple in tuples {
let mut is_overwrite = true;

let mut primary_keys = Vec::new();
for (i, column) in input_schema.iter().enumerate() {
if let Some(value) = value_map.get(&column.id()) {
if column.desc().is_primary {
let old_key = tuple.id.replace(value.clone()).unwrap();

throw!(transaction.remove_tuple(&table_name, &old_key));
is_overwrite = false;
if column.desc().is_primary() {
primary_keys.push(value.clone());
}
tuple.values[i] = value.clone();
}
}
if !primary_keys.is_empty() {
let id = if primary_keys.len() == 1 {
primary_keys.pop().unwrap()
} else {
Arc::new(DataValue::Tuple(Some(primary_keys)))
};
if &id != tuple.id.as_ref().unwrap() {
let old_key = tuple.id.replace(id).unwrap();

throw!(transaction.remove_tuple(&table_name, &old_key));
is_overwrite = false;
}
}
for (index_meta, exprs) in index_metas.iter() {
let values =
throw!(Projection::projection(&tuple, exprs, &input_schema));
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl MinMaxAccumulator {
Self {
inner: None,
op,
ty: *ty,
ty: ty.clone(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl SumAccumulator {

Ok(Self {
result: DataValue::none(ty),
evaluator: EvaluatorFactory::binary_create(*ty, BinaryOperator::Plus)?,
evaluator: EvaluatorFactory::binary_create(ty.clone(), BinaryOperator::Plus)?,
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/execution/dql/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Describe {
let table = throw!(throw!(transaction.table(cache.0, self.table_name.clone()))
.ok_or(DatabaseError::TableNotFound));
let key_fn = |column: &ColumnCatalog| {
if column.desc().is_primary {
if column.desc().is_primary() {
PRIMARY_KEY_TYPE.clone()
} else if column.desc().is_unique {
} else if column.desc().is_unique() {
UNIQUE_KEY_TYPE.clone()
} else {
EMPTY_KEY_TYPE.clone()
Expand Down
3 changes: 2 additions & 1 deletion src/expression/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ impl ScalarExpression {
let mut when_value = when_expr.eval(tuple, schema)?;
let is_true = if let Some(operand_value) = &operand_value {
let ty = operand_value.logical_type();
let evaluator = EvaluatorFactory::binary_create(ty, BinaryOperator::Eq)?;
let evaluator =
EvaluatorFactory::binary_create(ty.clone(), BinaryOperator::Eq)?;

if when_value.logical_type() != ty {
when_value = Arc::new(DataValue::clone(&when_value).cast(&ty)?);
Expand Down
Loading

0 comments on commit fe88322

Please sign in to comment.