Skip to content

Commit

Permalink
feat: Update supports Set using expressions
Browse files Browse the repository at this point in the history
fix:
- primary key value position when declaring multiple primary keys(update & insert & delete)
- decimal type conversion
- the `Eq` expression of `PushPredicateIntoScan` must satisfy all index columns before it can be used
- `PrimaryKey` index supplementary composite index type
- `Tuple` deserialize may fail due to a large difference in the number of projection columns and table schema
  • Loading branch information
KKould committed Nov 9, 2024
1 parent 36aa42f commit a84d8ba
Show file tree
Hide file tree
Showing 45 changed files with 486 additions and 319 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ Cargo.lock
fncksql_data
fncksql_bench
sqlite_bench
fnck_sql_tpcc

tests/data/row_20000.csv
2 changes: 1 addition & 1 deletion src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
column_def,
} => {
let plan = TableScanOperator::build(table_name.clone(), table);
let column = self.bind_column(column_def)?;
let column = self.bind_column(column_def, None)?;

if !is_valid_identifier(column.name()) {
return Err(DatabaseError::InvalidColumn(
Expand Down
25 changes: 17 additions & 8 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl<T: Transaction> Binder<'_, '_, T> {
}
let mut columns: Vec<ColumnCatalog> = columns
.iter()
.map(|col| self.bind_column(col))
.enumerate()
.map(|(i, col)| self.bind_column(col, Some(i)))
.try_collect()?;
for constraint in constraints {
match constraint {
Expand All @@ -56,13 +57,17 @@ impl<T: Transaction> Binder<'_, '_, T> {
is_primary,
..
} => {
for column_name in column_names.iter().map(|ident| ident.value.to_lowercase()) {
for (i, column_name) in column_names
.iter()
.map(|ident| ident.value.to_lowercase())
.enumerate()
{
if let Some(column) = columns
.iter_mut()
.find(|column| column.name() == column_name)
{
if *is_primary {
column.desc_mut().set_primary(true);
column.desc_mut().set_primary(Some(i));
} else {
column.desc_mut().set_unique(true);
}
Expand All @@ -89,11 +94,15 @@ impl<T: Transaction> Binder<'_, '_, T> {
))
}

pub fn bind_column(&mut self, column_def: &ColumnDef) -> Result<ColumnCatalog, DatabaseError> {
pub fn bind_column(
&mut self,
column_def: &ColumnDef,
column_index: Option<usize>,
) -> Result<ColumnCatalog, DatabaseError> {
let column_name = column_def.name.value.to_lowercase();
let mut column_desc = ColumnDesc::new(
LogicalType::try_from(column_def.data_type.clone())?,
false,
None,
false,
None,
)?;
Expand All @@ -106,7 +115,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
ColumnOption::NotNull => nullable = false,
ColumnOption::Unique { is_primary, .. } => {
if *is_primary {
column_desc.set_primary(true);
column_desc.set_primary(column_index);
nullable = false;
// Skip other options when using primary key
break;
Expand Down Expand Up @@ -184,15 +193,15 @@ mod tests {
debug_assert_eq!(op.columns[0].nullable(), false);
debug_assert_eq!(
op.columns[0].desc(),
&ColumnDesc::new(LogicalType::Integer, true, false, None)?
&ColumnDesc::new(LogicalType::Integer, Some(0), false, None)?
);
debug_assert_eq!(op.columns[1].name(), "name");
debug_assert_eq!(op.columns[1].nullable(), true);
debug_assert_eq!(
op.columns[1].desc(),
&ColumnDesc::new(
LogicalType::Varchar(Some(10), CharLengthUnits::Characters),
false,
None,
false,
None
)?
Expand Down
26 changes: 13 additions & 13 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::planner::operator::table_scan::TableScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use itertools::Itertools;
use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins};
use std::sync::Arc;

Expand All @@ -23,20 +24,19 @@ impl<T: Transaction> Binder<'_, '_, T> {
table_alias = Some(Arc::new(name.value.to_lowercase()));
alias_idents = Some(columns);
}
let source = self
let Source::Table(table) = self
.context
.source_and_bind(table_name.clone(), table_alias.as_ref(), None, false)?
.ok_or(DatabaseError::SourceNotFound)?;
let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default();
let primary_key_column = source
.columns(schema_buf)
.find(|column| column.desc().is_primary())
.cloned()
.unwrap();
let mut plan = match source {
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
Source::View(view) => LogicalPlan::clone(&view.plan),
.source_and_bind(table_name.clone(), table_alias.as_ref(), None, true)?
.ok_or(DatabaseError::TableNotFound)?
else {
unreachable!()
};
let primary_keys = table
.primary_keys()
.iter()
.map(|(_, column)| column.clone())
.collect_vec();
let mut plan = TableScanOperator::build(table_name.clone(), table);

if let Some(alias_idents) = alias_idents {
plan =
Expand All @@ -50,7 +50,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
Ok(LogicalPlan::new(
Operator::Delete(DeleteOperator {
table_name,
primary_key_column,
primary_keys,
}),
vec![plan],
))
Expand Down
6 changes: 3 additions & 3 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
Ok(ScalarExpression::ColumnRef(
source
.column(&full_name.1, schema_buf)
.ok_or_else(|| DatabaseError::NotFound("column", full_name.1.to_string()))?,
.ok_or_else(|| DatabaseError::ColumnNotFound(full_name.1.to_string()))?,
))
} else {
let op =
Expand Down Expand Up @@ -373,7 +373,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if let Some(parent) = self.parent {
op(&mut got_column, &parent.context, &mut self.table_schema_buf);
}
Ok(got_column.ok_or(DatabaseError::NotFound("column", full_name.1))?)
Ok(got_column.ok_or(DatabaseError::ColumnNotFound(full_name.1))?)
}
}

Expand Down Expand Up @@ -621,7 +621,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}));
}

Err(DatabaseError::NotFound("function", summary.name))
Err(DatabaseError::FunctionNotFound(summary.name))
}

fn return_type(
Expand Down
8 changes: 4 additions & 4 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,12 @@ pub mod test {
ColumnCatalog::new(
"c1".to_string(),
false,
ColumnDesc::new(Integer, true, false, None)?,
ColumnDesc::new(Integer, Some(0), false, None)?,
),
ColumnCatalog::new(
"c2".to_string(),
false,
ColumnDesc::new(Integer, false, true, None)?,
ColumnDesc::new(Integer, None, true, None)?,
),
],
false,
Expand All @@ -587,12 +587,12 @@ pub mod test {
ColumnCatalog::new(
"c3".to_string(),
false,
ColumnDesc::new(Integer, true, false, None)?,
ColumnDesc::new(Integer, Some(0), false, None)?,
),
ColumnCatalog::new(
"c4".to_string(),
false,
ColumnDesc::new(Integer, false, false, None)?,
ColumnDesc::new(Integer, None, false, None)?,
),
],
false,
Expand Down
49 changes: 19 additions & 30 deletions src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,52 +26,41 @@ impl<T: Transaction> Binder<'_, '_, T> {
if let Some(predicate) = selection {
plan = self.bind_where(plan, predicate)?;
}
let mut value_exprs = Vec::with_capacity(assignments.len());

let mut schema = Vec::with_capacity(assignments.len());
let mut row = Vec::with_capacity(assignments.len());

if assignments.is_empty() {
return Err(DatabaseError::ColumnsEmpty);
}
for Assignment { id, value } in assignments {
let mut expression = self.bind_expr(value)?;
expression.constant_calculation()?;
let expression = self.bind_expr(value)?;

for ident in id {
match self.bind_column_ref_from_identifiers(
slice::from_ref(ident),
Some(table_name.to_string()),
)? {
ScalarExpression::ColumnRef(column) => {
match &expression {
ScalarExpression::Constant(value) => {
let ty = column.datatype();
// Check if the value length is too long
value.check_len(ty)?;

let mut value = value.clone();
if value.logical_type() != *ty {
value = value.cast(ty)?;
}
row.push(value);
}
ScalarExpression::Empty => {
let default_value = column
.default_value()?
.ok_or(DatabaseError::DefaultNotExist)?;
row.push(default_value);
}
_ => return Err(DatabaseError::UnsupportedStmt(value.to_string())),
}
schema.push(column);
let expr = if matches!(expression, ScalarExpression::Empty) {
let default_value = column
.default_value()?
.ok_or(DatabaseError::DefaultNotExist)?;
ScalarExpression::Constant(default_value)
} else {
expression.clone()
};
value_exprs.push((column, expr));
}
_ => return Err(DatabaseError::InvalidColumn(ident.to_string())),
}
}
}
self.context.allow_default = false;
let values_plan = self.bind_values(vec![row], Arc::new(schema));

Ok(LogicalPlan::new(
Operator::Update(UpdateOperator { table_name }),
vec![plan, values_plan],
Operator::Update(UpdateOperator {
table_name,
value_exprs,
}),
vec![plan],
))
} else {
unreachable!("only table")
Expand Down
18 changes: 11 additions & 7 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ColumnCatalog {
// SAFETY: default expr must not be [`ScalarExpression::ColumnRef`]
desc: ColumnDesc::new(
LogicalType::Varchar(None, CharLengthUnits::Characters),
false,
None,
false,
None,
)
Expand Down Expand Up @@ -187,15 +187,15 @@ impl ColumnCatalog {
#[derive(Debug, Clone, PartialEq, Eq, Hash, ReferenceSerialization)]
pub struct ColumnDesc {
pub(crate) column_datatype: LogicalType,
is_primary: bool,
primary: Option<usize>,
is_unique: bool,
pub(crate) default: Option<ScalarExpression>,
}

impl ColumnDesc {
pub fn new(
column_datatype: LogicalType,
is_primary: bool,
primary: Option<usize>,
is_unique: bool,
default: Option<ScalarExpression>,
) -> Result<ColumnDesc, DatabaseError> {
Expand All @@ -207,18 +207,22 @@ impl ColumnDesc {

Ok(ColumnDesc {
column_datatype,
is_primary,
primary,
is_unique,
default,
})
}

pub(crate) fn primary(&self) -> Option<usize> {
self.primary
}

pub(crate) fn is_primary(&self) -> bool {
self.is_primary
self.primary.is_some()
}

pub(crate) fn set_primary(&mut self, is_primary: bool) {
self.is_primary = is_primary
pub(crate) fn set_primary(&mut self, is_primary: Option<usize>) {
self.primary = is_primary
}

pub(crate) fn is_unique(&self) -> bool {
Expand Down
34 changes: 19 additions & 15 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,7 @@ impl TableCatalog {
.add_column(col_catalog, &mut generator)
.unwrap();
}
table_catalog.primary_keys = table_catalog
.schema_ref
.iter()
.enumerate()
.filter(|&(_, column)| column.desc().is_primary())
.map(|(i, column)| (i, column.clone()))
.collect_vec();
table_catalog.primary_keys = Self::build_primary_keys(&table_catalog.schema_ref);

Ok(table_catalog)
}
Expand All @@ -197,12 +191,7 @@ impl TableCatalog {
columns.insert(column_id, i);
}
let schema_ref = Arc::new(column_refs.clone());
let primary_keys = schema_ref
.iter()
.enumerate()
.filter(|&(_, column)| column.desc().is_primary())
.map(|(i, column)| (i, column.clone()))
.collect_vec();
let primary_keys = Self::build_primary_keys(&schema_ref);

Ok(TableCatalog {
name,
Expand All @@ -213,6 +202,21 @@ impl TableCatalog {
primary_keys,
})
}

fn build_primary_keys(schema_ref: &Arc<Vec<ColumnRef>>) -> Vec<(usize, ColumnRef)> {
schema_ref
.iter()
.enumerate()
.filter_map(|(i, column)| {
column
.desc()
.primary()
.map(|p_i| (p_i, (i, column.clone())))
})
.sorted_by_key(|(p_i, _)| *p_i)
.map(|(_, entry)| entry)
.collect_vec()
}
}

impl TableMeta {
Expand All @@ -236,12 +240,12 @@ mod tests {
let col0 = ColumnCatalog::new(
"a".into(),
false,
ColumnDesc::new(LogicalType::Integer, false, false, None).unwrap(),
ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(),
);
let col1 = ColumnCatalog::new(
"b".into(),
false,
ColumnDesc::new(LogicalType::Boolean, false, false, None).unwrap(),
ColumnDesc::new(LogicalType::Boolean, None, false, None).unwrap(),
);
let col_catalogs = vec![col0, col1];
let table_catalog = TableCatalog::new(Arc::new("test".to_string()), col_catalogs).unwrap();
Expand Down
Loading

0 comments on commit a84d8ba

Please sign in to comment.