diff --git a/Cargo.toml b/Cargo.toml index 228c96d8..fe89d3f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ lazy_static = "1.4.0" comfy-table = "7.0.1" bytes = "1.5.0" kip_db = "0.1.2-alpha.17" -async-recursion = "1.0.5" rust_decimal = "1" csv = "1" diff --git a/src/binder/aggregate.rs b/src/binder/aggregate.rs index 7b21de5a..9e65e0ea 100644 --- a/src/binder/aggregate.rs +++ b/src/binder/aggregate.rs @@ -5,7 +5,7 @@ use std::collections::HashSet; use crate::binder::{BindError, InputRefType}; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::{ expression::ScalarExpression, planner::operator::{aggregate::AggregateOperator, sort::SortField}, @@ -13,7 +13,7 @@ use crate::{ use super::Binder; -impl Binder { +impl<'a, T: Transaction> Binder<'a, T> { pub fn bind_aggregate( &mut self, children: LogicalPlan, @@ -33,29 +33,28 @@ impl Binder { Ok(()) } - pub async fn extract_group_by_aggregate( + pub fn extract_group_by_aggregate( &mut self, select_list: &mut [ScalarExpression], groupby: &[Expr], ) -> Result<(), BindError> { - self.validate_groupby_illegal_column(select_list, groupby) - .await?; + self.validate_groupby_illegal_column(select_list, groupby)?; for gb in groupby { - let mut expr = self.bind_expr(gb).await?; + let mut expr = self.bind_expr(gb)?; self.visit_group_by_expr(select_list, &mut expr); } Ok(()) } - pub async fn extract_having_orderby_aggregate( + pub fn extract_having_orderby_aggregate( &mut self, having: &Option, orderbys: &[OrderByExpr], ) -> Result<(Option, Option>), BindError> { // Extract having expression. let return_having = if let Some(having) = having { - let mut having = self.bind_expr(having).await?; + let mut having = self.bind_expr(having)?; self.visit_column_agg_expr(&mut having, false)?; Some(having) @@ -72,7 +71,7 @@ impl Binder { asc, nulls_first, } = orderby; - let mut expr = self.bind_expr(expr).await?; + let mut expr = self.bind_expr(expr)?; self.visit_column_agg_expr(&mut expr, false)?; return_orderby.push(SortField::new( @@ -156,14 +155,14 @@ impl Binder { /// e.g. SELECT a,count(b) FROM t GROUP BY a. it's ok. /// SELECT a,b FROM t GROUP BY a. it's error. /// SELECT a,count(b) FROM t GROUP BY b. it's error. - async fn validate_groupby_illegal_column( + fn validate_groupby_illegal_column( &mut self, select_items: &[ScalarExpression], groupby: &[Expr], ) -> Result<(), BindError> { let mut group_raw_exprs = vec![]; for expr in groupby { - let expr = self.bind_expr(expr).await?; + let expr = self.bind_expr(expr)?; if let ScalarExpression::Alias { alias, .. } = expr { let alias_expr = select_items.iter().find(|column| { diff --git a/src/binder/copy.rs b/src/binder/copy.rs index ad671d72..8b148abb 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -44,13 +44,13 @@ impl std::fmt::Display for FileFormat { impl FromStr for ExtSource { type Err = (); - fn from_str(_s: &str) -> std::result::Result { + fn from_str(_s: &str) -> Result { Err(()) } } -impl Binder { - pub(super) async fn bind_copy( +impl<'a, T: Transaction> Binder<'a, T> { + pub(super) fn bind_copy( &mut self, source: CopySource, to: bool, @@ -69,7 +69,7 @@ impl Binder { } }; - if let Some(table) = self.context.storage.table(&table_name.to_string()).await { + if let Some(table) = self.context.transaction.table(&table_name.to_string()) { let cols = table.all_columns(); let ext_source = ExtSource { path: match target { diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 80227721..bba6497f 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -9,9 +9,9 @@ use crate::catalog::ColumnCatalog; use crate::planner::operator::create_table::CreateTableOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; -impl Binder { +impl<'a, T: Transaction> Binder<'a, T> { // TODO: TableConstraint pub(crate) fn bind_create_table( &mut self, @@ -60,19 +60,22 @@ mod tests { use super::*; use crate::binder::BinderContext; use crate::catalog::ColumnDesc; + use crate::execution::ExecutorError; use crate::storage::kip::KipStorage; + use crate::storage::Storage; use crate::types::LogicalType; use tempfile::TempDir; #[tokio::test] - async fn test_create_bind() { + async fn test_create_bind() -> Result<(), ExecutorError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let storage = KipStorage::new(temp_dir.path()).await.unwrap(); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; let sql = "create table t1 (id int primary key, name varchar(10) null)"; - let binder = Binder::new(BinderContext::new(storage)); + let binder = Binder::new(BinderContext::new(&transaction)); let stmt = crate::parser::parse_sql(sql).unwrap(); - let plan1 = binder.bind(&stmt[0]).await.unwrap(); + let plan1 = binder.bind(&stmt[0]).unwrap(); match plan1.operator { Operator::CreateTable(op) => { @@ -92,5 +95,7 @@ mod tests { } _ => unreachable!(), } + + Ok(()) } } diff --git a/src/binder/delete.rs b/src/binder/delete.rs index 247712d9..c75a0b36 100644 --- a/src/binder/delete.rs +++ b/src/binder/delete.rs @@ -2,11 +2,11 @@ use crate::binder::{lower_case_name, split_name, BindError, Binder}; use crate::planner::operator::delete::DeleteOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use sqlparser::ast::{Expr, TableFactor, TableWithJoins}; -impl Binder { - pub(crate) async fn bind_delete( +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_delete( &mut self, from: &TableWithJoins, selection: &Option, @@ -14,10 +14,10 @@ impl Binder { if let TableFactor::Table { name, .. } = &from.relation { let name = lower_case_name(name); let (_, name) = split_name(&name)?; - let (table_name, mut plan) = self._bind_single_table_ref(None, name).await?; + let (table_name, mut plan) = self._bind_single_table_ref(None, name)?; if let Some(predicate) = selection { - plan = self.bind_where(plan, predicate).await?; + plan = self.bind_where(plan, predicate)?; } Ok(LogicalPlan { diff --git a/src/binder/distinct.rs b/src/binder/distinct.rs index fc0e1d59..1666040b 100644 --- a/src/binder/distinct.rs +++ b/src/binder/distinct.rs @@ -2,9 +2,9 @@ use crate::binder::Binder; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; -impl Binder { +impl<'a, T: Transaction> Binder<'a, T> { pub fn bind_distinct( &mut self, children: LogicalPlan, diff --git a/src/binder/drop_table.rs b/src/binder/drop_table.rs index 45e17b2a..0f604273 100644 --- a/src/binder/drop_table.rs +++ b/src/binder/drop_table.rs @@ -2,11 +2,11 @@ use crate::binder::{lower_case_name, split_name, BindError, Binder}; use crate::planner::operator::drop_table::DropTableOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use sqlparser::ast::ObjectName; use std::sync::Arc; -impl Binder { +impl<'a, T: Transaction> Binder<'a, T> { pub(crate) fn bind_drop_table(&mut self, name: &ObjectName) -> Result { let name = lower_case_name(&name); let (_, name) = split_name(&name)?; diff --git a/src/binder/expr.rs b/src/binder/expr.rs index da845f2b..f3a55a7a 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -1,6 +1,5 @@ use crate::binder::BindError; use crate::expression::agg::AggKind; -use async_recursion::async_recursion; use itertools::Itertools; use sqlparser::ast::{ BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident, UnaryOperator, @@ -10,35 +9,29 @@ use std::sync::Arc; use super::Binder; use crate::expression::ScalarExpression; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::value::DataValue; use crate::types::LogicalType; -impl Binder { - #[async_recursion] - pub(crate) async fn bind_expr(&mut self, expr: &Expr) -> Result { +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result { match expr { Expr::Identifier(ident) => { self.bind_column_ref_from_identifiers(slice::from_ref(ident), None) - .await - } - Expr::CompoundIdentifier(idents) => { - self.bind_column_ref_from_identifiers(idents, None).await - } - Expr::BinaryOp { left, right, op } => { - self.bind_binary_op_internal(left, right, op).await } + Expr::CompoundIdentifier(idents) => self.bind_column_ref_from_identifiers(idents, None), + Expr::BinaryOp { left, right, op } => self.bind_binary_op_internal(left, right, op), Expr::Value(v) => Ok(ScalarExpression::Constant(Arc::new(v.into()))), - Expr::Function(func) => self.bind_agg_call(func).await, - Expr::Nested(expr) => self.bind_expr(expr).await, - Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op).await, + Expr::Function(func) => self.bind_agg_call(func), + Expr::Nested(expr) => self.bind_expr(expr), + Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op), _ => { todo!() } } } - pub async fn bind_column_ref_from_identifiers( + pub fn bind_column_ref_from_identifiers( &mut self, idents: &[Ident], bind_table_name: Option<&String>, @@ -66,9 +59,8 @@ impl Binder { if let Some(table) = table_name.or(bind_table_name) { let table_catalog = self .context - .storage + .transaction .table(table) - .await .ok_or_else(|| BindError::InvalidTable(table.to_string()))?; let column_catalog = table_catalog @@ -100,14 +92,14 @@ impl Binder { } } - async fn bind_binary_op_internal( + fn bind_binary_op_internal( &mut self, left: &Expr, right: &Expr, op: &BinaryOperator, ) -> Result { - let left_expr = Box::new(self.bind_expr(left).await?); - let right_expr = Box::new(self.bind_expr(right).await?); + let left_expr = Box::new(self.bind_expr(left)?); + let right_expr = Box::new(self.bind_expr(right)?); let ty = match op { BinaryOperator::Plus @@ -137,12 +129,12 @@ impl Binder { }) } - async fn bind_unary_op_internal( + fn bind_unary_op_internal( &mut self, expr: &Expr, op: &UnaryOperator, ) -> Result { - let expr = Box::new(self.bind_expr(expr).await?); + let expr = Box::new(self.bind_expr(expr)?); let ty = if let UnaryOperator::Not = op { LogicalType::Boolean } else { @@ -156,7 +148,7 @@ impl Binder { }) } - async fn bind_agg_call(&mut self, func: &Function) -> Result { + fn bind_agg_call(&mut self, func: &Function) -> Result { let mut args = Vec::with_capacity(func.args.len()); for arg in func.args.iter() { @@ -165,7 +157,7 @@ impl Binder { FunctionArg::Unnamed(arg) => arg, }; match arg_expr { - FunctionArgExpr::Expr(expr) => args.push(self.bind_expr(expr).await?), + FunctionArgExpr::Expr(expr) => args.push(self.bind_expr(expr)?), FunctionArgExpr::Wildcard => args.push(Self::wildcard_expr()), _ => todo!(), } diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 85164f36..0122dc06 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -6,14 +6,14 @@ use crate::planner::operator::insert::InsertOperator; use crate::planner::operator::values::ValuesOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::value::{DataValue, ValueRef}; use sqlparser::ast::{Expr, Ident, ObjectName}; use std::slice; use std::sync::Arc; -impl Binder { - pub(crate) async fn bind_insert( +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_insert( &mut self, name: ObjectName, idents: &[Ident], @@ -24,7 +24,7 @@ impl Binder { let (_, name) = split_name(&name)?; let table_name = Arc::new(name.to_string()); - if let Some(table) = self.context.storage.table(&table_name).await { + if let Some(table) = self.context.transaction.table(&table_name) { let mut columns = Vec::new(); if idents.is_empty() { @@ -32,13 +32,10 @@ impl Binder { } else { let bind_table_name = Some(table_name.to_string()); for ident in idents { - match self - .bind_column_ref_from_identifiers( - slice::from_ref(ident), - bind_table_name.as_ref(), - ) - .await? - { + match self.bind_column_ref_from_identifiers( + slice::from_ref(ident), + bind_table_name.as_ref(), + )? { ScalarExpression::ColumnRef(catalog) => columns.push(catalog), _ => unreachable!(), } @@ -50,7 +47,7 @@ impl Binder { let mut row = Vec::with_capacity(expr_row.len()); for (i, expr) in expr_row.into_iter().enumerate() { - match &self.bind_expr(expr).await? { + match &self.bind_expr(expr)? { ScalarExpression::Constant(value) => { // Check if the value length is too long value.check_len(columns[i].datatype())?; diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 134b6fa9..0081d397 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -18,7 +18,7 @@ use crate::catalog::{CatalogError, TableCatalog, TableName, DEFAULT_SCHEMA_NAME} use crate::expression::ScalarExpression; use crate::planner::operator::join::JoinType; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::errors::TypeError; pub enum InputRefType { @@ -27,18 +27,18 @@ pub enum InputRefType { } #[derive(Clone)] -pub struct BinderContext { - pub(crate) storage: S, +pub struct BinderContext<'a, T: Transaction> { + pub(crate) transaction: &'a T, pub(crate) bind_table: BTreeMap)>, aliases: BTreeMap, group_by_exprs: Vec, pub(crate) agg_calls: Vec, } -impl BinderContext { - pub fn new(storage: S) -> Self { +impl<'a, T: Transaction> BinderContext<'a, T> { + pub fn new(transaction: &'a T) -> Self { BinderContext { - storage, + transaction, bind_table: Default::default(), aliases: Default::default(), group_by_exprs: vec![], @@ -67,18 +67,18 @@ impl BinderContext { } } -pub struct Binder { - context: BinderContext, +pub struct Binder<'a, T: Transaction> { + context: BinderContext<'a, T>, } -impl Binder { - pub fn new(context: BinderContext) -> Self { +impl<'a, T: Transaction> Binder<'a, T> { + pub fn new(context: BinderContext<'a, T>) -> Self { Binder { context } } - pub async fn bind(mut self, stmt: &Statement) -> Result { + pub fn bind(mut self, stmt: &Statement) -> Result { let plan = match stmt { - Statement::Query(query) => self.bind_query(query).await?, + Statement::Query(query) => self.bind_query(query)?, Statement::CreateTable { name, columns, @@ -99,8 +99,7 @@ impl Binder { .. } => { if let SetExpr::Values(values) = source.body.as_ref() { - self.bind_insert(table_name.to_owned(), columns, &values.rows, *overwrite) - .await? + self.bind_insert(table_name.to_owned(), columns, &values.rows, *overwrite)? } else { todo!() } @@ -114,7 +113,7 @@ impl Binder { if !table.joins.is_empty() { unimplemented!() } else { - self.bind_update(table, selection, assignments).await? + self.bind_update(table, selection, assignments)? } } Statement::Delete { @@ -125,10 +124,10 @@ impl Binder { if !table.joins.is_empty() { unimplemented!() } else { - self.bind_delete(table, selection).await? + self.bind_delete(table, selection)? } } - Statement::Truncate { table_name, .. } => self.bind_truncate(table_name).await?, + Statement::Truncate { table_name, .. } => self.bind_truncate(table_name)?, Statement::ShowTables { .. } => self.bind_show_tables()?, Statement::Copy { source, @@ -136,10 +135,7 @@ impl Binder { target, options, .. - } => { - self.bind_copy(source.clone(), *to, target.clone(), &options) - .await? - } + } => self.bind_copy(source.clone(), *to, target.clone(), &options)?, _ => return Err(BindError::UnsupportedStmt(stmt.to_string())), }; Ok(plan) @@ -198,7 +194,7 @@ pub mod test { use crate::execution::ExecutorError; use crate::planner::LogicalPlan; use crate::storage::kip::KipStorage; - use crate::storage::{Storage, StorageError}; + use crate::storage::{Storage, StorageError, Transaction}; use crate::types::LogicalType::Integer; use std::path::PathBuf; use std::sync::Arc; @@ -208,46 +204,45 @@ pub mod test { path: impl Into + Send, ) -> Result { let storage = KipStorage::new(path).await?; + let mut transaction = storage.transaction().await?; + + let _ = transaction.create_table( + Arc::new("t1".to_string()), + vec![ + ColumnCatalog::new( + "c1".to_string(), + false, + ColumnDesc::new(Integer, true, false), + None, + ), + ColumnCatalog::new( + "c2".to_string(), + false, + ColumnDesc::new(Integer, false, true), + None, + ), + ], + )?; - let _ = storage - .create_table( - Arc::new("t1".to_string()), - vec![ - ColumnCatalog::new( - "c1".to_string(), - false, - ColumnDesc::new(Integer, true, false), - None, - ), - ColumnCatalog::new( - "c2".to_string(), - false, - ColumnDesc::new(Integer, false, true), - None, - ), - ], - ) - .await?; + let _ = transaction.create_table( + Arc::new("t2".to_string()), + vec![ + ColumnCatalog::new( + "c3".to_string(), + false, + ColumnDesc::new(Integer, true, false), + None, + ), + ColumnCatalog::new( + "c4".to_string(), + false, + ColumnDesc::new(Integer, false, false), + None, + ), + ], + )?; - let _ = storage - .create_table( - Arc::new("t2".to_string()), - vec![ - ColumnCatalog::new( - "c3".to_string(), - false, - ColumnDesc::new(Integer, true, false), - None, - ), - ColumnCatalog::new( - "c4".to_string(), - false, - ColumnDesc::new(Integer, false, false), - None, - ), - ], - ) - .await?; + transaction.commit().await?; Ok(storage) } @@ -255,9 +250,10 @@ pub mod test { pub async fn select_sql_run(sql: &str) -> Result { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = build_test_catalog(temp_dir.path()).await?; - let binder = Binder::new(BinderContext::new(storage)); + let transaction = storage.transaction().await?; + let binder = Binder::new(BinderContext::new(&transaction)); let stmt = crate::parser::parse_sql(sql)?; - Ok(binder.bind(&stmt[0]).await?) + Ok(binder.bind(&stmt[0])?) } } diff --git a/src/binder/select.rs b/src/binder/select.rs index 585aadff..11dc168f 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -1,4 +1,3 @@ -use async_recursion::async_recursion; use std::borrow::Borrow; use std::collections::HashMap; use std::sync::Arc; @@ -26,7 +25,7 @@ use crate::expression::BinaryOperator; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::LogicalType; use itertools::Itertools; use sqlparser::ast; @@ -35,16 +34,15 @@ use sqlparser::ast::{ SelectItem, SetExpr, TableFactor, TableWithJoins, }; -impl Binder { - #[async_recursion] - pub(crate) async fn bind_query(&mut self, query: &Query) -> Result { +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_query(&mut self, query: &Query) -> Result { if let Some(_with) = &query.with { // TODO support with clause. } let mut plan = match query.body.borrow() { - SetExpr::Select(select) => self.bind_select(select, &query.order_by).await, - SetExpr::Query(query) => self.bind_query(query).await, + SetExpr::Select(select) => self.bind_select(select, &query.order_by), + SetExpr::Query(query) => self.bind_query(query), _ => unimplemented!(), }?; @@ -52,43 +50,40 @@ impl Binder { let offset = &query.offset; if limit.is_some() || offset.is_some() { - plan = self.bind_limit(plan, limit, offset).await?; + plan = self.bind_limit(plan, limit, offset)?; } Ok(plan) } - async fn bind_select( + fn bind_select( &mut self, select: &Select, orderby: &[OrderByExpr], ) -> Result { - let mut plan = self.bind_table_ref(&select.from).await?; + let mut plan = self.bind_table_ref(&select.from)?; // Resolve scalar function call. // TODO support SRF(Set-Returning Function). - let mut select_list = self.normalize_select_item(&select.projection).await?; + let mut select_list = self.normalize_select_item(&select.projection)?; self.extract_select_join(&mut select_list); if let Some(predicate) = &select.selection { - plan = self.bind_where(plan, predicate).await?; + plan = self.bind_where(plan, predicate)?; } self.extract_select_aggregate(&mut select_list)?; if !select.group_by.is_empty() { - self.extract_group_by_aggregate(&mut select_list, &select.group_by) - .await?; + self.extract_group_by_aggregate(&mut select_list, &select.group_by)?; } let mut having_orderby = (None, None); if select.having.is_some() || !orderby.is_empty() { - having_orderby = self - .extract_having_orderby_aggregate(&select.having, orderby) - .await?; + having_orderby = self.extract_having_orderby_aggregate(&select.having, orderby)?; } if !self.context.agg_calls.is_empty() || !self.context.group_by_exprs.is_empty() { @@ -116,7 +111,7 @@ impl Binder { Ok(plan) } - pub(crate) async fn bind_table_ref( + pub(crate) fn bind_table_ref( &mut self, from: &[TableWithJoins], ) -> Result { @@ -130,17 +125,17 @@ impl Binder { let TableWithJoins { relation, joins } = &from[0]; - let (left_name, mut plan) = self.bind_single_table_ref(relation, None).await?; + let (left_name, mut plan) = self.bind_single_table_ref(relation, None)?; if !joins.is_empty() { for join in joins { - plan = self.bind_join(left_name.clone(), plan, join).await?; + plan = self.bind_join(left_name.clone(), plan, join)?; } } Ok(plan) } - async fn bind_single_table_ref( + fn bind_single_table_ref( &mut self, table: &TableFactor, joint_type: Option, @@ -164,7 +159,7 @@ impl Binder { table = &alias.name.value; } - self._bind_single_table_ref(joint_type, table).await? + self._bind_single_table_ref(joint_type, table)? } _ => unimplemented!(), }; @@ -172,7 +167,7 @@ impl Binder { Ok(plan_with_name) } - pub(crate) async fn _bind_single_table_ref( + pub(crate) fn _bind_single_table_ref( &mut self, joint_type: Option, table: &str, @@ -185,9 +180,8 @@ impl Binder { let table_catalog = self .context - .storage + .transaction .table(&table_name) - .await .ok_or_else(|| BindError::InvalidTable(format!("bind table {}", table)))?; self.context @@ -206,7 +200,7 @@ impl Binder { /// - Qualified name with wildcard, e.g. `SELECT t.* FROM t,t1` /// - Scalar expression or aggregate expression, e.g. `SELECT COUNT(*) + 1 AS count FROM t` /// - async fn normalize_select_item( + fn normalize_select_item( &mut self, items: &[SelectItem], ) -> Result, BindError> { @@ -214,9 +208,9 @@ impl Binder { for item in items.iter().enumerate() { match item.1 { - SelectItem::UnnamedExpr(expr) => select_items.push(self.bind_expr(expr).await?), + SelectItem::UnnamedExpr(expr) => select_items.push(self.bind_expr(expr)?), SelectItem::ExprWithAlias { expr, alias } => { - let expr = self.bind_expr(expr).await?; + let expr = self.bind_expr(expr)?; let alias_name = alias.to_string(); self.context.add_alias(alias_name.clone(), expr.clone()); @@ -227,7 +221,7 @@ impl Binder { }); } SelectItem::Wildcard(_) => { - select_items.extend_from_slice(self.bind_all_column_refs().await?.as_slice()); + select_items.extend_from_slice(self.bind_all_column_refs()?.as_slice()); } _ => todo!("bind select list"), @@ -237,14 +231,13 @@ impl Binder { Ok(select_items) } - async fn bind_all_column_refs(&mut self) -> Result, BindError> { + fn bind_all_column_refs(&mut self) -> Result, BindError> { let mut exprs = vec![]; for table_name in self.context.bind_table.keys().cloned() { let table = self .context - .storage + .transaction .table(&table_name) - .await .ok_or_else(|| BindError::InvalidTable(table_name.to_string()))?; for col in table.all_columns() { exprs.push(ScalarExpression::ColumnRef(col)); @@ -254,7 +247,7 @@ impl Binder { Ok(exprs) } - async fn bind_join( + fn bind_join( &mut self, left_table: TableName, left: LogicalPlan, @@ -274,43 +267,36 @@ impl Binder { _ => unimplemented!(), }; - let (right_table, right) = self - .bind_single_table_ref(relation, Some(join_type)) - .await?; + let (right_table, right) = self.bind_single_table_ref(relation, Some(join_type))?; let left_table = self .context - .storage + .transaction .table(&left_table) - .await .cloned() .ok_or_else(|| BindError::InvalidTable(format!("Left: {} not found", left_table)))?; let right_table = self .context - .storage + .transaction .table(&right_table) - .await .cloned() .ok_or_else(|| BindError::InvalidTable(format!("Right: {} not found", right_table)))?; let on = match joint_condition { - Some(constraint) => { - self.bind_join_constraint(&left_table, &right_table, constraint) - .await? - } + Some(constraint) => self.bind_join_constraint(&left_table, &right_table, constraint)?, None => JoinCondition::None, }; Ok(LJoinOperator::new(left, right, on, join_type)) } - pub(crate) async fn bind_where( + pub(crate) fn bind_where( &mut self, children: LogicalPlan, predicate: &Expr, ) -> Result { Ok(FilterOperator::new( - self.bind_expr(predicate).await?, + self.bind_expr(predicate)?, children, false, )) @@ -348,7 +334,7 @@ impl Binder { } } - async fn bind_limit( + fn bind_limit( &mut self, children: LogicalPlan, limit_expr: &Option, @@ -357,7 +343,7 @@ impl Binder { let mut limit = 0; let mut offset = 0; if let Some(expr) = limit_expr { - let expr = self.bind_expr(expr).await?; + let expr = self.bind_expr(expr)?; match expr { ScalarExpression::Constant(dv) => match dv.as_ref() { DataValue::Int32(Some(v)) if *v > 0 => limit = *v as usize, @@ -377,7 +363,7 @@ impl Binder { } if let Some(expr) = offset_expr { - let expr = self.bind_expr(&expr.value).await?; + let expr = self.bind_expr(&expr.value)?; match expr { ScalarExpression::Constant(dv) => match dv.as_ref() { DataValue::Int32(Some(v)) if *v > 0 => offset = *v as usize, @@ -437,7 +423,7 @@ impl Binder { } } - async fn bind_join_constraint( + fn bind_join_constraint( &mut self, left_table: &TableCatalog, right_table: &TableCatalog, @@ -450,8 +436,7 @@ impl Binder { // expression that didn't match equi-join pattern let mut filter = vec![]; - self.extract_join_keys(expr, &mut on_keys, &mut filter, left_table, right_table) - .await?; + self.extract_join_keys(expr, &mut on_keys, &mut filter, left_table, right_table)?; // combine multiple filter exprs into one BinaryExpr let join_filter = filter @@ -483,8 +468,7 @@ impl Binder { /// foo = bar AND bar = baz => accum=[(foo, bar), (bar, baz)] accum_filter=[] /// foo = bar AND baz > 1 => accum=[(foo, bar)] accum_filter=[baz > 1] /// ``` - #[async_recursion] - async fn extract_join_keys( + fn extract_join_keys( &mut self, expr: &Expr, accum: &mut Vec<(ScalarExpression, ScalarExpression)>, @@ -495,8 +479,8 @@ impl Binder { match expr { Expr::BinaryOp { left, op, right } => match op { ast::BinaryOperator::Eq => { - let left = self.bind_expr(left).await?; - let right = self.bind_expr(right).await?; + let left = self.bind_expr(left)?; + let right = self.bind_expr(right)?; match (&left, &right) { // example: foo = bar @@ -511,12 +495,12 @@ impl Binder { { accum.push((right, left)); } else { - accum_filter.push(self.bind_expr(expr).await?); + accum_filter.push(self.bind_expr(expr)?); } } // example: baz = 1 _other => { - accum_filter.push(self.bind_expr(expr).await?); + accum_filter.push(self.bind_expr(expr)?); } } } @@ -529,26 +513,24 @@ impl Binder { accum_filter, left_schema, right_schema, - ) - .await?; + )?; self.extract_join_keys( right, accum, accum_filter, left_schema, right_schema, - ) - .await?; + )?; } } _other => { // example: baz > 1 - accum_filter.push(self.bind_expr(expr).await?); + accum_filter.push(self.bind_expr(expr)?); } }, _other => { // example: baz in (xxx), something else will convert to filter logic - accum_filter.push(self.bind_expr(expr).await?); + accum_filter.push(self.bind_expr(expr)?); } } Ok(()) diff --git a/src/binder/show.rs b/src/binder/show.rs index 31cf4935..f5855a5d 100644 --- a/src/binder/show.rs +++ b/src/binder/show.rs @@ -2,9 +2,9 @@ use crate::binder::{BindError, Binder}; use crate::planner::operator::show::ShowTablesOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; -impl Binder { +impl<'a, T: Transaction> Binder<'a, T> { pub(crate) fn bind_show_tables(&mut self) -> Result { let plan = LogicalPlan { operator: Operator::Show(ShowTablesOperator {}), diff --git a/src/binder/truncate.rs b/src/binder/truncate.rs index b670ad0b..4a34478e 100644 --- a/src/binder/truncate.rs +++ b/src/binder/truncate.rs @@ -2,15 +2,12 @@ use crate::binder::{lower_case_name, split_name, BindError, Binder}; use crate::planner::operator::truncate::TruncateOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use sqlparser::ast::ObjectName; use std::sync::Arc; -impl Binder { - pub(crate) async fn bind_truncate( - &mut self, - name: &ObjectName, - ) -> Result { +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_truncate(&mut self, name: &ObjectName) -> Result { let name = lower_case_name(&name); let (_, name) = split_name(&name)?; let table_name = Arc::new(name.to_string()); diff --git a/src/binder/update.rs b/src/binder/update.rs index d6504562..868d2141 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -3,14 +3,14 @@ use crate::expression::ScalarExpression; use crate::planner::operator::update::UpdateOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::value::ValueRef; use sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}; use std::slice; use std::sync::Arc; -impl Binder { - pub(crate) async fn bind_update( +impl<'a, T: Transaction> Binder<'a, T> { + pub(crate) fn bind_update( &mut self, to: &TableWithJoins, selection: &Option, @@ -21,10 +21,10 @@ impl Binder { let (_, name) = split_name(&name)?; let table_name = Arc::new(name.to_string()); - let mut plan = self.bind_table_ref(slice::from_ref(to)).await?; + let mut plan = self.bind_table_ref(slice::from_ref(to))?; if let Some(predicate) = selection { - plan = self.bind_where(plan, predicate).await?; + plan = self.bind_where(plan, predicate)?; } let bind_table_name = Some(table_name.to_string()); @@ -33,19 +33,16 @@ impl Binder { let mut row = Vec::with_capacity(assignments.len()); for assignment in assignments { - let value = match self.bind_expr(&assignment.value).await? { + let value = match self.bind_expr(&assignment.value)? { ScalarExpression::Constant(value) => Ok::(value), _ => unreachable!(), }?; for ident in &assignment.id { - match self - .bind_column_ref_from_identifiers( - slice::from_ref(&ident), - bind_table_name.as_ref(), - ) - .await? - { + match self.bind_column_ref_from_identifiers( + slice::from_ref(&ident), + bind_table_name.as_ref(), + )? { ScalarExpression::ColumnRef(catalog) => { value.check_len(catalog.datatype())?; columns.push(catalog); diff --git a/src/db.rs b/src/db.rs index 1ee0d0cd..37c25d77 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,4 +1,5 @@ use sqlparser::parser::ParserError; +use std::cell::RefCell; use std::path::PathBuf; use crate::binder::{BindError, Binder, BinderContext}; @@ -11,23 +12,13 @@ use crate::optimizer::OptimizerError; use crate::parser::parse_sql; use crate::planner::LogicalPlan; use crate::storage::kip::KipStorage; -use crate::storage::memory::MemStorage; -use crate::storage::{Storage, StorageError}; +use crate::storage::{Storage, StorageError, Transaction}; use crate::types::tuple::Tuple; pub struct Database { pub storage: S, } -impl Database { - /// Create a new Database instance With Memory. - pub async fn with_mem() -> Self { - let storage = MemStorage::new(); - - Database { storage } - } -} - impl Database { /// Create a new Database instance With KipDB. pub async fn with_kipdb(path: impl Into + Send) -> Result { @@ -45,6 +36,8 @@ impl Database { /// Run SQL queries. pub async fn run(&self, sql: &str) -> Result, DatabaseError> { + let transaction = self.storage.transaction().await?; + // parse let stmts = parse_sql(sql)?; @@ -52,7 +45,7 @@ impl Database { return Ok(vec![]); } - let binder = Binder::new(BinderContext::new(self.storage.clone())); + let binder = Binder::new(BinderContext::new(&transaction)); /// Build a logical plan. /// @@ -61,15 +54,19 @@ impl Database { /// Sort(a) /// Limit(1) /// Project(a,b) - let source_plan = binder.bind(&stmts[0]).await?; + let source_plan = binder.bind(&stmts[0])?; // println!("source_plan plan: {:#?}", source_plan); let best_plan = Self::default_optimizer(source_plan).find_best()?; // println!("best_plan plan: {:#?}", best_plan); - let mut stream = build(best_plan, &self.storage); + let transaction = RefCell::new(transaction); + let mut stream = build(best_plan, &transaction); + let tuples = try_collect(&mut stream).await?; + + transaction.into_inner().commit().await?; - Ok(try_collect(&mut stream).await?) + Ok(tuples) } fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer { @@ -151,15 +148,15 @@ pub enum DatabaseError { #[cfg(test)] mod test { - use crate::catalog::{ColumnCatalog, ColumnDesc, TableName}; + use crate::catalog::{ColumnCatalog, ColumnDesc}; use crate::db::{Database, DatabaseError}; - use crate::storage::{Storage, StorageError}; + use crate::storage::{Storage, StorageError, Transaction}; use crate::types::tuple::create_table; use crate::types::LogicalType; use std::sync::Arc; use tempfile::TempDir; - async fn build_table(storage: &impl Storage) -> Result { + async fn build_table(mut transaction: impl Transaction) -> Result<(), StorageError> { let columns = vec![ ColumnCatalog::new( "c1".to_string(), @@ -174,17 +171,19 @@ mod test { None, ), ]; + let _ = transaction.create_table(Arc::new("t1".to_string()), columns)?; + transaction.commit().await?; - Ok(storage - .create_table(Arc::new("t1".to_string()), columns) - .await?) + Ok(()) } #[tokio::test] async fn test_run_sql() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let database = Database::with_kipdb(temp_dir.path()).await?; - let _ = build_table(&database.storage).await?; + let transaction = database.storage.transaction().await?; + build_table(transaction).await?; + let batch = database.run("select * from t1").await?; println!("{:#?}", batch); diff --git a/src/execution/executor/ddl/create_table.rs b/src/execution/executor/ddl/create_table.rs index 253c6074..f395bbe7 100644 --- a/src/execution/executor/ddl/create_table.rs +++ b/src/execution/executor/ddl/create_table.rs @@ -1,10 +1,11 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::create_table::CreateTableOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct CreateTable { op: CreateTableOperator, @@ -16,23 +17,24 @@ impl From for CreateTable { } } -impl Executor for CreateTable { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for CreateTable { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl CreateTable { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute(self, transaction: &mut T) { let CreateTableOperator { table_name, columns, } = self.op; - let _ = storage.create_table(table_name.clone(), columns).await?; + let _ = transaction.create_table(table_name.clone(), columns)?; let tuple_builder = TupleBuilder::new_result(); let tuple = tuple_builder .push_result("CREATE TABLE SUCCESS", format!("{}", table_name).as_str())?; + yield tuple; } } diff --git a/src/execution/executor/ddl/drop_table.rs b/src/execution/executor/ddl/drop_table.rs index 3541f57b..0136153f 100644 --- a/src/execution/executor/ddl/drop_table.rs +++ b/src/execution/executor/ddl/drop_table.rs @@ -1,9 +1,10 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::drop_table::DropTableOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct DropTable { op: DropTableOperator, @@ -15,17 +16,17 @@ impl From for DropTable { } } -impl Executor for DropTable { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for DropTable { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl DropTable { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute(self, transaction: &mut T) { let DropTableOperator { table_name } = self.op; - storage.drop_table(&table_name).await?; + transaction.drop_table(&table_name)?; } } diff --git a/src/execution/executor/ddl/truncate.rs b/src/execution/executor/ddl/truncate.rs index 5be57612..150bb45b 100644 --- a/src/execution/executor/ddl/truncate.rs +++ b/src/execution/executor/ddl/truncate.rs @@ -1,9 +1,10 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::truncate::TruncateOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct Truncate { op: TruncateOperator, @@ -15,17 +16,17 @@ impl From for Truncate { } } -impl Executor for Truncate { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for Truncate { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl Truncate { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute(self, transaction: &mut T) { let TruncateOperator { table_name } = self.op; - storage.drop_data(&table_name).await?; + transaction.drop_data(&table_name)?; } } diff --git a/src/execution/executor/dml/copy_from_file.rs b/src/execution/executor/dml/copy_from_file.rs index e560aacd..89e460f2 100644 --- a/src/execution/executor/dml/copy_from_file.rs +++ b/src/execution/executor/dml/copy_from_file.rs @@ -2,10 +2,11 @@ use crate::binder::copy::FileFormat; use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::copy_from_file::CopyFromFileOperator; -use crate::storage::{Storage, Transaction}; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; use futures_async_stream::try_stream; +use std::cell::RefCell; use std::fs::File; use std::io::BufReader; use tokio::sync::mpsc::Sender; @@ -21,37 +22,34 @@ impl From for CopyFromFile { } } -impl Executor for CopyFromFile { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for CopyFromFile { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl CopyFromFile { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute(self, transaction: &mut T) { let (tx, mut rx) = tokio::sync::mpsc::channel(1); let (tx1, mut rx1) = tokio::sync::mpsc::channel(1); // # Cancellation // When this stream is dropped, the `rx` is dropped, the spawned task will fail to send to // `tx`, then the task will finish. let table_name = self.op.table.clone(); - if let Some(mut txn) = storage.transaction(&table_name).await { - let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx)); - let mut size = 0 as usize; - while let Some(chunk) = rx.recv().await { - txn.append(chunk, false)?; - size += 1; - } - handle.await??; - txn.commit().await?; + let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx)); + let mut size = 0 as usize; + while let Some(chunk) = rx.recv().await { + transaction.append(&table_name, chunk, false)?; + size += 1; + } + handle.await??; - let handle = tokio::task::spawn_blocking(move || return_result(size.clone(), tx1)); - while let Some(chunk) = rx1.recv().await { - yield chunk; - } - handle.await??; + let handle = tokio::task::spawn_blocking(move || return_result(size.clone(), tx1)); + while let Some(chunk) = rx1.recv().await { + yield chunk; } + handle.await??; } /// Read records from file using blocking IO. /// @@ -122,6 +120,7 @@ mod tests { use super::*; use crate::binder::copy::ExtSource; + use crate::storage::Storage; use crate::types::LogicalType; #[tokio::test] @@ -193,7 +192,13 @@ mod tests { let _ = db .run("create table test_copy (a int primary key, b float, c varchar(10))") .await; - let actual = executor.execute(&db.storage).next().await.unwrap()?; + let storage = db.storage; + let transaction = RefCell::new(storage.transaction().await?); + let actual = executor + .execute(vec![], &transaction) + .next() + .await + .unwrap()?; let tuple_builder = TupleBuilder::new_result(); let expected = tuple_builder diff --git a/src/execution/executor/dml/delete.rs b/src/execution/executor/dml/delete.rs index 32f8a974..caa219e9 100644 --- a/src/execution/executor/dml/delete.rs +++ b/src/execution/executor/dml/delete.rs @@ -2,38 +2,35 @@ use crate::catalog::TableName; use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::delete::DeleteOperator; -use crate::storage::{Storage, Transaction}; +use crate::storage::Transaction; use crate::types::index::Index; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; use itertools::Itertools; +use std::cell::RefCell; pub struct Delete { table_name: TableName, - input: BoxedExecutor, } -impl From<(DeleteOperator, BoxedExecutor)> for Delete { - fn from((DeleteOperator { table_name }, input): (DeleteOperator, BoxedExecutor)) -> Self { - Delete { table_name, input } +impl From for Delete { + fn from(DeleteOperator { table_name }: DeleteOperator) -> Delete { + Delete { table_name } } } -impl Executor for Delete { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for Delete { + fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) } } } impl Delete { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { - let Delete { table_name, input } = self; - - if let Some(mut transaction) = storage.transaction(&table_name).await { - let table_catalog = storage.table(&table_name).await.unwrap(); - - let vec = table_catalog + async fn _execute(self, transaction: &mut T, mut inputs: Vec) { + let Delete { table_name } = self; + let option_index_metas = transaction.table(&table_name).map(|table_catalog| { + table_catalog .all_columns() .into_iter() .enumerate() @@ -44,18 +41,20 @@ impl Delete { col.id.and_then(|col_id| { table_catalog .get_unique_index(&col_id) - .map(|index_meta| (i, index_meta)) + .map(|index_meta| (i, index_meta.clone())) }) }) .flatten() }) - .collect_vec(); + .collect_vec() + }); + if let Some(index_metas) = option_index_metas { #[for_await] - for tuple in input { + for tuple in inputs.remove(0) { let tuple: Tuple = tuple?; - for (i, index_meta) in vec.iter() { + for (i, index_meta) in index_metas.iter() { let value = &tuple.values[*i]; if !value.is_null() { @@ -64,15 +63,14 @@ impl Delete { column_values: vec![value.clone()], }; - transaction.del_index(&index)?; + transaction.del_index(&table_name, &index)?; } } if let Some(tuple_id) = tuple.id { - transaction.delete(tuple_id)?; + transaction.delete(&table_name, tuple_id)?; } } - transaction.commit().await?; } } } diff --git a/src/execution/executor/dml/insert.rs b/src/execution/executor/dml/insert.rs index 42993ff9..ac7ad57c 100644 --- a/src/execution/executor/dml/insert.rs +++ b/src/execution/executor/dml/insert.rs @@ -2,61 +2,57 @@ use crate::catalog::TableName; use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::insert::InsertOperator; -use crate::storage::{Storage, Transaction}; +use crate::storage::Transaction; use crate::types::index::Index; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use futures_async_stream::try_stream; +use std::cell::RefCell; use std::collections::HashMap; use std::sync::Arc; pub struct Insert { table_name: TableName, - input: BoxedExecutor, is_overwrite: bool, } -impl From<(InsertOperator, BoxedExecutor)> for Insert { +impl From for Insert { fn from( - ( - InsertOperator { - table_name, - is_overwrite, - }, - input, - ): (InsertOperator, BoxedExecutor), - ) -> Self { + InsertOperator { + table_name, + is_overwrite, + }: InsertOperator, + ) -> Insert { Insert { table_name, - input, is_overwrite, } } } -impl Executor for Insert { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for Insert { + fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) } } } impl Insert { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute( + self, + transaction: &mut T, + mut inputs: Vec, + ) { let Insert { table_name, - input, is_overwrite, } = self; let mut primary_key_index = None; let mut unique_values = HashMap::new(); - if let (Some(table_catalog), Some(mut transaction)) = ( - storage.table(&table_name).await, - storage.transaction(&table_name).await, - ) { + if let Some(table_catalog) = transaction.table(&table_name).cloned() { #[for_await] - for tuple in input { + for tuple in inputs.remove(0) { let Tuple { columns, values, .. } = tuple?; @@ -104,7 +100,7 @@ impl Insert { tuple.values.push(value) } - transaction.append(tuple, is_overwrite)?; + transaction.append(&table_name, tuple, is_overwrite)?; } // Unique Index for (col_id, values) in unique_values { @@ -115,12 +111,10 @@ impl Insert { column_values: vec![value], }; - transaction.add_index(index, vec![tuple_id], true)?; + transaction.add_index(&table_name, index, vec![tuple_id], true)?; } } } - - transaction.commit().await?; } } } diff --git a/src/execution/executor/dml/update.rs b/src/execution/executor/dml/update.rs index 9fbc0e56..6c6d12ab 100644 --- a/src/execution/executor/dml/update.rs +++ b/src/execution/executor/dml/update.rs @@ -2,56 +2,44 @@ use crate::catalog::TableName; use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::update::UpdateOperator; -use crate::storage::{Storage, Transaction}; +use crate::storage::Transaction; use crate::types::index::Index; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; use std::collections::HashMap; pub struct Update { table_name: TableName, - input: BoxedExecutor, - values: BoxedExecutor, } -impl From<(UpdateOperator, BoxedExecutor, BoxedExecutor)> for Update { - fn from( - (UpdateOperator { table_name }, input, values): ( - UpdateOperator, - BoxedExecutor, - BoxedExecutor, - ), - ) -> Self { - Update { - table_name, - input, - values, - } +impl From for Update { + fn from(UpdateOperator { table_name }: UpdateOperator) -> Update { + Update { table_name } } } -impl Executor for Update { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for Update { + fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) } } } impl Update { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { - let Update { - table_name, - input, - values, - } = self; + pub async fn _execute( + self, + transaction: &mut T, + mut inputs: Vec, + ) { + let Update { table_name } = self; - if let Some(mut transaction) = storage.transaction(&table_name).await { - let table_catalog = storage.table(&table_name).await.unwrap(); + if let Some(table_catalog) = transaction.table(&table_name).cloned() { let mut value_map = HashMap::new(); // only once #[for_await] - for tuple in values { + for tuple in inputs.remove(1) { let Tuple { columns, values, .. } = tuple?; @@ -60,7 +48,7 @@ impl Update { } } #[for_await] - for tuple in input { + for tuple in inputs.remove(0) { let mut tuple: Tuple = tuple?; let mut is_overwrite = true; @@ -69,7 +57,7 @@ impl Update { if column.desc.is_primary { let old_key = tuple.id.replace(value.clone()).unwrap(); - transaction.delete(old_key)?; + transaction.delete(&table_name, old_key)?; is_overwrite = false; } if column.desc.is_unique && value != &tuple.values[i] { @@ -80,11 +68,12 @@ impl Update { id: index_meta.id, column_values: vec![tuple.values[i].clone()], }; - transaction.del_index(&index)?; + transaction.del_index(&table_name, &index)?; if !value.is_null() { index.column_values[0] = value.clone(); transaction.add_index( + &table_name, index, vec![tuple.id.clone().unwrap()], true, @@ -97,10 +86,8 @@ impl Update { } } - transaction.append(tuple, is_overwrite)?; + transaction.append(&table_name, tuple, is_overwrite)?; } - - transaction.commit().await?; } } } diff --git a/src/execution/executor/dql/aggregate/hash_agg.rs b/src/execution/executor/dql/aggregate/hash_agg.rs index 8cfc5779..3d9ea1d7 100644 --- a/src/execution/executor/dql/aggregate/hash_agg.rs +++ b/src/execution/executor/dql/aggregate/hash_agg.rs @@ -3,51 +3,47 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; use ahash::{HashMap, HashMapExt}; use futures_async_stream::try_stream; use itertools::Itertools; +use std::cell::RefCell; pub struct HashAggExecutor { pub agg_calls: Vec, pub groupby_exprs: Vec, - pub input: BoxedExecutor, } -impl From<(AggregateOperator, BoxedExecutor)> for HashAggExecutor { +impl From for HashAggExecutor { fn from( - ( - AggregateOperator { - agg_calls, - groupby_exprs, - }, - input, - ): (AggregateOperator, BoxedExecutor), - ) -> Self { + AggregateOperator { + agg_calls, + groupby_exprs, + }: AggregateOperator, + ) -> HashAggExecutor { HashAggExecutor { agg_calls, groupby_exprs, - input, } } } -impl Executor for HashAggExecutor { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for HashAggExecutor { + fn execute<'a>(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl HashAggExecutor { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { + pub async fn _execute(self, mut inputs: Vec) { let mut group_and_agg_columns_option = None; let mut group_hash_accs = HashMap::new(); #[for_await] - for tuple in self.input { + for tuple in inputs.remove(0) { let tuple = tuple?; // 1. build group and agg columns for hash_agg columns. @@ -120,16 +116,21 @@ mod test { use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::operator::values::ValuesOperator; - use crate::storage::memory::MemStorage; + use crate::storage::kip::KipStorage; + use crate::storage::Storage; use crate::types::tuple::create_table; use crate::types::value::DataValue; use crate::types::LogicalType; use itertools::Itertools; + use std::cell::RefCell; use std::sync::Arc; + use tempfile::TempDir; #[tokio::test] async fn test_hash_agg() -> Result<(), ExecutorError> { - let mem_storage = MemStorage::new(); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await.unwrap(); + let transaction = RefCell::new(storage.transaction().await?); let desc = ColumnDesc::new(LogicalType::Integer, false, false); let t1_columns = vec![ @@ -188,10 +189,10 @@ mod test { ], columns: t1_columns, }) - .execute(&mem_storage); + .execute(vec![], &transaction); let tuples = - try_collect(&mut HashAggExecutor::from((operator, input)).execute(&mem_storage)) + try_collect(&mut HashAggExecutor::from(operator).execute(vec![input], &transaction)) .await?; println!("hash_agg_test: \n{}", create_table(&tuples)); diff --git a/src/execution/executor/dql/aggregate/simple_agg.rs b/src/execution/executor/dql/aggregate/simple_agg.rs index 8a00a045..60499c84 100644 --- a/src/execution/executor/dql/aggregate/simple_agg.rs +++ b/src/execution/executor/dql/aggregate/simple_agg.rs @@ -3,39 +3,37 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; use futures_async_stream::try_stream; use itertools::Itertools; +use std::cell::RefCell; pub struct SimpleAggExecutor { pub agg_calls: Vec, - pub input: BoxedExecutor, } -impl From<(AggregateOperator, BoxedExecutor)> for SimpleAggExecutor { - fn from( - (AggregateOperator { agg_calls, .. }, input): (AggregateOperator, BoxedExecutor), - ) -> Self { - SimpleAggExecutor { agg_calls, input } +impl From for SimpleAggExecutor { + fn from(AggregateOperator { agg_calls, .. }: AggregateOperator) -> SimpleAggExecutor { + SimpleAggExecutor { agg_calls } } } -impl Executor for SimpleAggExecutor { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for SimpleAggExecutor { + fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl SimpleAggExecutor { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { + pub async fn _execute(self, mut inputs: Vec) { let mut accs = create_accumulators(&self.agg_calls); let mut columns_option = None; #[for_await] - for tuple in self.input { + for tuple in inputs.remove(0) { let tuple = tuple?; columns_option.get_or_insert_with(|| { diff --git a/src/execution/executor/dql/dummy.rs b/src/execution/executor/dql/dummy.rs index 9b9c7e5e..54ff607f 100644 --- a/src/execution/executor/dql/dummy.rs +++ b/src/execution/executor/dql/dummy.rs @@ -1,13 +1,14 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct Dummy {} -impl Executor for Dummy { - fn execute(self, _: &S) -> BoxedExecutor { +impl Executor for Dummy { + fn execute<'a>(self, _inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { self._execute() } } diff --git a/src/execution/executor/dql/filter.rs b/src/execution/executor/dql/filter.rs index a3090548..30292dd9 100644 --- a/src/execution/executor/dql/filter.rs +++ b/src/execution/executor/dql/filter.rs @@ -2,35 +2,35 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::filter::FilterOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct Filter { predicate: ScalarExpression, - input: BoxedExecutor, } -impl From<(FilterOperator, BoxedExecutor)> for Filter { - fn from((FilterOperator { predicate, .. }, input): (FilterOperator, BoxedExecutor)) -> Self { - Filter { predicate, input } +impl From for Filter { + fn from(FilterOperator { predicate, .. }: FilterOperator) -> Filter { + Filter { predicate } } } -impl Executor for Filter { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for Filter { + fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl Filter { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let Filter { predicate, input } = self; + pub async fn _execute(self, mut inputs: Vec) { + let Filter { predicate } = self; #[for_await] - for tuple in input { + for tuple in inputs.remove(0) { let tuple = tuple?; if let DataValue::Boolean(option) = predicate.eval_column(&tuple)?.as_ref() { if let Some(true) = option { diff --git a/src/execution/executor/dql/index_scan.rs b/src/execution/executor/dql/index_scan.rs index 1fdc77f8..de04d4af 100644 --- a/src/execution/executor/dql/index_scan.rs +++ b/src/execution/executor/dql/index_scan.rs @@ -1,10 +1,11 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::scan::ScanOperator; -use crate::storage::{Iter, Storage, Transaction}; +use crate::storage::{Iter, Transaction}; use crate::types::errors::TypeError; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub(crate) struct IndexScan { op: ScanOperator, @@ -16,15 +17,15 @@ impl From for IndexScan { } } -impl Executor for IndexScan { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for IndexScan { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) } } } impl IndexScan { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute(self, transaction: &T) { let ScanOperator { table_name, columns, @@ -33,13 +34,11 @@ impl IndexScan { .. } = self.op; let (index_meta, binaries) = index_by.ok_or(TypeError::InvalidType)?; + let mut iter = + transaction.read_by_index(&table_name, limit, columns, index_meta, binaries)?; - if let Some(transaction) = storage.transaction(&table_name).await { - let mut iter = transaction.read_by_index(limit, columns, index_meta, binaries)?; - - while let Some(tuple) = iter.next_tuple()? { - yield tuple; - } + while let Some(tuple) = iter.next_tuple()? { + yield tuple; } } } diff --git a/src/execution/executor/dql/join/hash_join.rs b/src/execution/executor/dql/join/hash_join.rs index ed8b968f..41d8ac39 100644 --- a/src/execution/executor/dql/join/hash_join.rs +++ b/src/execution/executor/dql/join/hash_join.rs @@ -4,54 +4,37 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::errors::TypeError; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt, RandomState}; use futures_async_stream::try_stream; use itertools::Itertools; +use std::cell::RefCell; use std::sync::Arc; pub struct HashJoin { on: JoinCondition, ty: JoinType, - left_input: BoxedExecutor, - right_input: BoxedExecutor, } -impl From<(JoinOperator, BoxedExecutor, BoxedExecutor)> for HashJoin { - fn from( - (JoinOperator { on, join_type }, left_input, right_input): ( - JoinOperator, - BoxedExecutor, - BoxedExecutor, - ), - ) -> Self { - HashJoin { - on, - ty: join_type, - left_input, - right_input, - } +impl From for HashJoin { + fn from(JoinOperator { on, join_type }: JoinOperator) -> HashJoin { + HashJoin { on, ty: join_type } } } -impl Executor for HashJoin { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for HashJoin { + fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl HashJoin { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let HashJoin { - on, - ty, - left_input, - right_input, - } = self; + pub async fn _execute(self, mut inputs: Vec) { + let HashJoin { on, ty } = self; if ty == JoinType::Cross { unreachable!("Cross join should not be in HashJoinExecutor"); @@ -76,7 +59,7 @@ impl HashJoin { // 2.merged all left tuples. let mut left_init_flag = false; #[for_await] - for tuple in left_input { + for tuple in inputs.remove(0) { let tuple: Tuple = tuple?; let hash = Self::hash_row(&on_left_keys, &hash_random_state, &tuple)?; @@ -91,7 +74,7 @@ impl HashJoin { // probe phase let mut right_init_flag = false; #[for_await] - for tuple in right_input { + for tuple in inputs.remove(0) { let tuple: Tuple = tuple?; let right_cols_len = tuple.columns.len(); let hash = Self::hash_row(&on_right_keys, &hash_random_state, &tuple)?; @@ -256,15 +239,17 @@ mod test { use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::operator::values::ValuesOperator; - use crate::storage::memory::MemStorage; - use crate::storage::Storage; + use crate::storage::kip::KipStorage; + use crate::storage::{Storage, Transaction}; use crate::types::tuple::create_table; use crate::types::value::DataValue; use crate::types::LogicalType; + use std::cell::RefCell; use std::sync::Arc; + use tempfile::TempDir; - fn build_join_values( - _s: &S, + fn build_join_values( + _t: &RefCell, ) -> ( Vec<(ScalarExpression, ScalarExpression)>, BoxedExecutor, @@ -366,13 +351,19 @@ mod test { columns: t2_columns, }); - (on_keys, values_t1.execute(_s), values_t2.execute(_s)) + ( + on_keys, + values_t1.execute(vec![], &_t), + values_t2.execute(vec![], &_t), + ) } #[tokio::test] async fn test_inner_join() -> Result<(), ExecutorError> { - let mem_storage = MemStorage::new(); - let (keys, left, right) = build_join_values(&mem_storage); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = RefCell::new(storage.transaction().await?); + let (keys, left, right) = build_join_values(&transaction); let op = JoinOperator { on: JoinCondition::On { @@ -381,7 +372,7 @@ mod test { }, join_type: JoinType::Inner, }; - let mut executor = HashJoin::from((op, left, right)).execute(&mem_storage); + let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); let tuples = try_collect(&mut executor).await?; println!("inner_test: \n{}", create_table(&tuples)); @@ -406,8 +397,10 @@ mod test { #[tokio::test] async fn test_left_join() -> Result<(), ExecutorError> { - let mem_storage = MemStorage::new(); - let (keys, left, right) = build_join_values(&mem_storage); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = RefCell::new(storage.transaction().await?); + let (keys, left, right) = build_join_values(&transaction); let op = JoinOperator { on: JoinCondition::On { @@ -416,7 +409,7 @@ mod test { }, join_type: JoinType::Left, }; - let mut executor = HashJoin::from((op, left, right)).execute(&mem_storage); + let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); let tuples = try_collect(&mut executor).await?; println!("left_test: \n{}", create_table(&tuples)); @@ -445,8 +438,10 @@ mod test { #[tokio::test] async fn test_right_join() -> Result<(), ExecutorError> { - let mem_storage = MemStorage::new(); - let (keys, left, right) = build_join_values(&mem_storage); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = RefCell::new(storage.transaction().await?); + let (keys, left, right) = build_join_values(&transaction); let op = JoinOperator { on: JoinCondition::On { @@ -455,7 +450,7 @@ mod test { }, join_type: JoinType::Right, }; - let mut executor = HashJoin::from((op, left, right)).execute(&mem_storage); + let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); let tuples = try_collect(&mut executor).await?; println!("right_test: \n{}", create_table(&tuples)); @@ -484,8 +479,10 @@ mod test { #[tokio::test] async fn test_full_join() -> Result<(), ExecutorError> { - let mem_storage = MemStorage::new(); - let (keys, left, right) = build_join_values(&mem_storage); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = RefCell::new(storage.transaction().await?); + let (keys, left, right) = build_join_values(&transaction); let op = JoinOperator { on: JoinCondition::On { @@ -494,7 +491,7 @@ mod test { }, join_type: JoinType::Full, }; - let mut executor = HashJoin::from((op, left, right)).execute(&mem_storage); + let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); let tuples = try_collect(&mut executor).await?; println!("full_test: \n{}", create_table(&tuples)); diff --git a/src/execution/executor/dql/limit.rs b/src/execution/executor/dql/limit.rs index ab253fd4..a011af7d 100644 --- a/src/execution/executor/dql/limit.rs +++ b/src/execution/executor/dql/limit.rs @@ -1,41 +1,36 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::limit::LimitOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures::StreamExt; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct Limit { offset: Option, limit: Option, - input: BoxedExecutor, } -impl From<(LimitOperator, BoxedExecutor)> for Limit { - fn from((LimitOperator { offset, limit }, input): (LimitOperator, BoxedExecutor)) -> Self { +impl From for Limit { + fn from(LimitOperator { offset, limit }: LimitOperator) -> Limit { Limit { offset: Some(offset), limit: Some(limit), - input, } } } -impl Executor for Limit { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for Limit { + fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl Limit { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let Limit { - offset, - limit, - input, - } = self; + pub async fn _execute(self, mut inputs: Vec) { + let Limit { offset, limit } = self; if limit.is_some() && limit.unwrap() == 0 { return Ok(()); @@ -45,7 +40,7 @@ impl Limit { let offset_limit = offset_val + limit.unwrap_or(1) - 1; #[for_await] - for (i, tuple) in input.enumerate() { + for (i, tuple) in inputs.remove(0).enumerate() { if i < offset_val { continue; } else if i > offset_limit { diff --git a/src/execution/executor/dql/projection.rs b/src/execution/executor/dql/projection.rs index debbf6ad..cf27d6eb 100644 --- a/src/execution/executor/dql/projection.rs +++ b/src/execution/executor/dql/projection.rs @@ -2,37 +2,34 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::project::ProjectOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct Projection { exprs: Vec, - input: BoxedExecutor, } -impl From<(ProjectOperator, BoxedExecutor)> for Projection { - fn from((ProjectOperator { columns }, input): (ProjectOperator, BoxedExecutor)) -> Self { - Projection { - exprs: columns, - input, - } +impl From for Projection { + fn from(ProjectOperator { columns }: ProjectOperator) -> Projection { + Projection { exprs: columns } } } -impl Executor for Projection { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for Projection { + fn execute<'a>(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl Projection { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let Projection { exprs, input } = self; + pub async fn _execute(self, mut inputs: Vec) { + let Projection { exprs } = self; #[for_await] - for tuple in input { + for tuple in inputs.remove(0) { let tuple = tuple?; let mut columns = Vec::with_capacity(exprs.len()); diff --git a/src/execution/executor/dql/seq_scan.rs b/src/execution/executor/dql/seq_scan.rs index bb9f7daf..7cf15722 100644 --- a/src/execution/executor/dql/seq_scan.rs +++ b/src/execution/executor/dql/seq_scan.rs @@ -1,9 +1,10 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::scan::ScanOperator; -use crate::storage::{Iter, Storage, Transaction}; +use crate::storage::{Iter, Transaction}; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub(crate) struct SeqScan { op: ScanOperator, @@ -15,28 +16,25 @@ impl From for SeqScan { } } -impl Executor for SeqScan { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for SeqScan { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) } } } impl SeqScan { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { + pub async fn _execute(self, transaction: &T) { let ScanOperator { table_name, columns, limit, .. } = self.op; + let mut iter = transaction.read(&table_name, limit, columns)?; - if let Some(transaction) = storage.transaction(&table_name).await { - let mut iter = transaction.read(limit, columns)?; - - while let Some(tuple) = iter.next_tuple()? { - yield tuple; - } + while let Some(tuple) = iter.next_tuple()? { + yield tuple; } } } diff --git a/src/execution/executor/dql/sort.rs b/src/execution/executor/dql/sort.rs index 355edb83..37e9a84c 100644 --- a/src/execution/executor/dql/sort.rs +++ b/src/execution/executor/dql/sort.rs @@ -1,45 +1,37 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::sort::{SortField, SortOperator}; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; use std::cmp::Ordering; pub struct Sort { sort_fields: Vec, limit: Option, - input: BoxedExecutor, } -impl From<(SortOperator, BoxedExecutor)> for Sort { - fn from((SortOperator { sort_fields, limit }, input): (SortOperator, BoxedExecutor)) -> Self { - Sort { - sort_fields, - limit, - input, - } +impl From for Sort { + fn from(SortOperator { sort_fields, limit }: SortOperator) -> Sort { + Sort { sort_fields, limit } } } -impl Executor for Sort { - fn execute(self, _: &S) -> BoxedExecutor { - self._execute() +impl Executor for Sort { + fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + self._execute(inputs) } } impl Sort { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let Sort { - sort_fields, - limit, - input, - } = self; + pub async fn _execute(self, mut inputs: Vec) { + let Sort { sort_fields, limit } = self; let mut tuples: Vec = vec![]; #[for_await] - for tuple in input { + for tuple in inputs.remove(0) { tuples.push(tuple?); } diff --git a/src/execution/executor/dql/values.rs b/src/execution/executor/dql/values.rs index c94b5f37..a294533c 100644 --- a/src/execution/executor/dql/values.rs +++ b/src/execution/executor/dql/values.rs @@ -1,9 +1,10 @@ use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::values::ValuesOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; +use std::cell::RefCell; pub struct Values { op: ValuesOperator, @@ -15,8 +16,8 @@ impl From for Values { } } -impl Executor for Values { - fn execute(self, _: &S) -> BoxedExecutor { +impl Executor for Values { + fn execute(self, _inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { self._execute() } } diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs index 93d1a193..e8d53e66 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/executor/mod.rs @@ -25,89 +25,90 @@ use crate::execution::executor::show::show_table::ShowTables; use crate::execution::ExecutorError; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures::stream::BoxStream; use futures::TryStreamExt; +use std::cell::RefCell; pub type BoxedExecutor = BoxStream<'static, Result>; -pub trait Executor { - fn execute(self, storage: &S) -> BoxedExecutor; +pub trait Executor { + fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor; } -pub fn build(plan: LogicalPlan, storage: &S) -> BoxedExecutor { +pub fn build(plan: LogicalPlan, transaction: &RefCell) -> BoxedExecutor { let LogicalPlan { operator, mut childrens, } = plan; match operator { - Operator::Dummy => Dummy {}.execute(storage), + Operator::Dummy => Dummy {}.execute(vec![], transaction), Operator::Aggregate(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); if op.groupby_exprs.is_empty() { - SimpleAggExecutor::from((op, input)).execute(storage) + SimpleAggExecutor::from(op).execute(vec![input], transaction) } else { - HashAggExecutor::from((op, input)).execute(storage) + HashAggExecutor::from(op).execute(vec![input], transaction) } } Operator::Filter(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); - Filter::from((op, input)).execute(storage) + Filter::from(op).execute(vec![input], transaction) } Operator::Join(op) => { - let left_input = build(childrens.remove(0), storage); - let right_input = build(childrens.remove(0), storage); + let left_input = build(childrens.remove(0), transaction); + let right_input = build(childrens.remove(0), transaction); - HashJoin::from((op, left_input, right_input)).execute(storage) + HashJoin::from(op).execute(vec![left_input, right_input], transaction) } Operator::Project(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); - Projection::from((op, input)).execute(storage) + Projection::from(op).execute(vec![input], transaction) } Operator::Scan(op) => { if op.index_by.is_some() { - IndexScan::from(op).execute(storage) + IndexScan::from(op).execute(vec![], transaction) } else { - SeqScan::from(op).execute(storage) + SeqScan::from(op).execute(vec![], transaction) } } Operator::Sort(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); - Sort::from((op, input)).execute(storage) + Sort::from(op).execute(vec![input], transaction) } Operator::Limit(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); - Limit::from((op, input)).execute(storage) + Limit::from(op).execute(vec![input], transaction) } Operator::Insert(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); - Insert::from((op, input)).execute(storage) + Insert::from(op).execute(vec![input], transaction) } Operator::Update(op) => { - let input = build(childrens.remove(0), storage); - let values = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); + let values = build(childrens.remove(0), transaction); - Update::from((op, input, values)).execute(storage) + Update::from(op).execute(vec![input, values], transaction) } Operator::Delete(op) => { - let input = build(childrens.remove(0), storage); + let input = build(childrens.remove(0), transaction); - Delete::from((op, input)).execute(storage) + Delete::from(op).execute(vec![input], transaction) } - Operator::Values(op) => Values::from(op).execute(storage), - Operator::CreateTable(op) => CreateTable::from(op).execute(storage), - Operator::DropTable(op) => DropTable::from(op).execute(storage), - Operator::Truncate(op) => Truncate::from(op).execute(storage), - Operator::Show(op) => ShowTables::from(op).execute(storage), - Operator::CopyFromFile(op) => CopyFromFile::from(op).execute(storage), + Operator::Values(op) => Values::from(op).execute(vec![], transaction), + Operator::CreateTable(op) => CreateTable::from(op).execute(vec![], transaction), + Operator::DropTable(op) => DropTable::from(op).execute(vec![], transaction), + Operator::Truncate(op) => Truncate::from(op).execute(vec![], transaction), + Operator::Show(op) => ShowTables::from(op).execute(vec![], transaction), + Operator::CopyFromFile(op) => CopyFromFile::from(op).execute(vec![], transaction), #[warn(unused_assignments)] Operator::CopyToFile(_op) => { todo!() diff --git a/src/execution/executor/show/show_table.rs b/src/execution/executor/show/show_table.rs index bff78db4..fcc8d7cf 100644 --- a/src/execution/executor/show/show_table.rs +++ b/src/execution/executor/show/show_table.rs @@ -3,10 +3,11 @@ use crate::catalog::ColumnRef; use crate::execution::executor::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::show::ShowTablesOperator; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, ValueRef}; use futures_async_stream::try_stream; +use std::cell::RefCell; use std::sync::Arc; pub struct ShowTables { @@ -19,16 +20,16 @@ impl From for ShowTables { } } -impl Executor for ShowTables { - fn execute(self, storage: &S) -> BoxedExecutor { - self._execute(storage.clone()) +impl Executor for ShowTables { + fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) } } } impl ShowTables { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, storage: S) { - let tables = storage.show_tables().await?; + pub async fn _execute(self, transaction: &T) { + let tables = transaction.show_tables()?; for table in tables { let columns: Vec = diff --git a/src/expression/mod.rs b/src/expression/mod.rs index bd6cf28d..6c617fd8 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -9,7 +9,7 @@ use sqlparser::ast::{BinaryOperator as SqlBinaryOperator, UnaryOperator as SqlUn use self::agg::AggKind; use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; -use crate::storage::Storage; +use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; use crate::types::LogicalType; @@ -146,7 +146,7 @@ impl ScalarExpression { exprs } - pub fn has_agg_call(&self, context: &BinderContext) -> bool { + pub fn has_agg_call(&self, context: &BinderContext<'_, T>) -> bool { match self { ScalarExpression::InputRef { index, .. } => context.agg_calls.get(*index).is_some(), ScalarExpression::AggCall { .. } => unreachable!(), diff --git a/src/storage/kip.rs b/src/storage/kip.rs index e9babe86..4be9e9f5 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -1,4 +1,4 @@ -use crate::catalog::{ColumnCatalog, TableCatalog, TableName}; +use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName}; use crate::expression::simplify::ConstantBinary; use crate::storage::table_codec::TableCodec; use crate::storage::{ @@ -14,7 +14,6 @@ use kip_db::kernel::lsm::mvcc::TransactionIter; use kip_db::kernel::lsm::storage::Config; use kip_db::kernel::lsm::{mvcc, storage}; use kip_db::kernel::utils::lru_cache::ShardingLruCache; -use kip_db::kernel::Storage as KipDBStorage; use std::collections::hash_map::RandomState; use std::collections::{Bound, VecDeque}; use std::mem; @@ -24,7 +23,6 @@ use std::sync::Arc; #[derive(Clone)] pub struct KipStorage { - cache: Arc>, pub inner: Arc, } @@ -34,225 +32,28 @@ impl KipStorage { let storage = storage::KipStorage::open_with_config(config).await?; Ok(KipStorage { - cache: Arc::new(ShardingLruCache::new(32, 16, RandomState::default())?), inner: Arc::new(storage), }) } - - fn column_collect( - name: &String, - tx: &mvcc::Transaction, - ) -> Result<(Vec, Option), StorageError> { - let (column_min, column_max) = TableCodec::columns_bound(name); - let mut column_iter = - tx.iter(Bound::Included(&column_min), Bound::Included(&column_max))?; - - let mut columns = vec![]; - let mut name_option = None; - - while let Some((_, value_option)) = column_iter.try_next().ok().flatten() { - if let Some(value) = value_option { - let (table_name, column) = TableCodec::decode_column(&value)?; - - if name != table_name.as_str() { - return Ok((vec![], None)); - } - let _ = name_option.insert(table_name); - - columns.push(column); - } - } - - Ok((columns, name_option)) - } - - fn index_meta_collect(name: &String, tx: &mvcc::Transaction) -> Option> { - let (index_min, index_max) = TableCodec::index_meta_bound(name); - let mut index_metas = vec![]; - let mut index_iter = tx - .iter(Bound::Included(&index_min), Bound::Included(&index_max)) - .ok()?; - - while let Some((_, value_option)) = index_iter.try_next().ok().flatten() { - if let Some(value) = value_option { - if let Some(index_meta) = TableCodec::decode_index_meta(&value).ok() { - index_metas.push(Arc::new(index_meta)); - } - } - } - - Some(index_metas) - } - - fn _drop_data(table: &mut KipTransaction, min: &[u8], max: &[u8]) -> Result<(), StorageError> { - let mut iter = table - .tx - .iter(Bound::Included(&min), Bound::Included(&max))?; - let mut data_keys = vec![]; - - while let Some((key, value_option)) = iter.try_next()? { - if value_option.is_some() { - data_keys.push(key); - } - } - drop(iter); - - for key in data_keys { - table.tx.remove(&key)? - } - - Ok(()) - } - - fn create_index_meta_for_table( - tx: &mut mvcc::Transaction, - table: &mut TableCatalog, - ) -> Result<(), StorageError> { - let table_name = table.name.clone(); - - for col in table - .all_columns() - .into_iter() - .filter(|col| col.desc.is_unique) - { - if let Some(col_id) = col.id { - let meta = IndexMeta { - id: 0, - column_ids: vec![col_id], - name: format!("uk_{}", col.name), - is_unique: true, - }; - let meta_ref = table.add_index_meta(meta); - let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?; - - tx.set(key, value); - } - } - Ok(()) - } } #[async_trait] impl Storage for KipStorage { type TransactionType = KipTransaction; - async fn create_table( - &self, - table_name: TableName, - columns: Vec, - ) -> Result { - let mut tx = self.inner.new_transaction().await; - let mut table_catalog = TableCatalog::new(table_name.clone(), columns)?; - - Self::create_index_meta_for_table(&mut tx, &mut table_catalog)?; - - for (_, column) in &table_catalog.columns { - let (key, value) = TableCodec::encode_column(column)?; - tx.set(key, value); - } - - let (k, v) = TableCodec::encode_root_table(&table_name)?; - self.inner.set(k, v).await?; - - tx.commit().await?; - self.cache.put(table_name.to_string(), table_catalog); - - Ok(table_name) - } - - async fn drop_table(&self, name: &String) -> Result<(), StorageError> { - self.drop_data(name).await?; - - let (min, max) = TableCodec::columns_bound(name); - let mut tx = self.inner.new_transaction().await; - let mut iter = tx.iter(Bound::Included(&min), Bound::Included(&max))?; - let mut col_keys = vec![]; - - while let Some((key, value_option)) = iter.try_next()? { - if value_option.is_some() { - col_keys.push(key); - } - } - drop(iter); - - for col_key in col_keys { - tx.remove(&col_key)? - } - tx.remove(&TableCodec::encode_root_table_key(name))?; - tx.commit().await?; - - let _ = self.cache.remove(name); - - Ok(()) - } - - async fn drop_data(&self, name: &String) -> Result<(), StorageError> { - if let Some(mut transaction) = self.transaction(name).await { - let (tuple_min, tuple_max) = transaction.table_codec.tuple_bound(); - Self::_drop_data(&mut transaction, &tuple_min, &tuple_max)?; - - let (index_min, index_max) = transaction.table_codec.all_index_bound(); - Self::_drop_data(&mut transaction, &index_min, &index_max)?; - - transaction.tx.commit().await?; - } - - Ok(()) - } - - async fn transaction(&self, name: &String) -> Option { - let table_codec = self.table(name).await.map(|catalog| TableCodec { - table: catalog.clone(), - })?; + async fn transaction(&self) -> Result { let tx = self.inner.new_transaction().await; - Some(KipTransaction { table_codec, tx }) - } - - async fn table(&self, name: &String) -> Option<&TableCatalog> { - let mut option = self.cache.get(name); - - if option.is_none() { - let tx = self.inner.new_transaction().await; - // TODO: unify the data into a `Meta` prefix and use one iteration to collect all data - let (columns, name_option) = Self::column_collect(name, &tx).ok()?; - let indexes = Self::index_meta_collect(name, &tx)?; - - if let Some(catalog) = name_option.and_then(|table_name| { - TableCatalog::new_with_indexes(table_name, columns, indexes).ok() - }) { - option = self - .cache - .get_or_insert(name.to_string(), |_| Ok(catalog)) - .ok(); - } - } - - option - } - - async fn show_tables(&self) -> Result, StorageError> { - let mut tables = vec![]; - let (min, max) = TableCodec::root_table_bound(); - - let tx = self.inner.new_transaction().await; - let mut iter = tx.iter(Bound::Included(&min), Bound::Included(&max))?; - - while let Some((_, value_option)) = iter.try_next().ok().flatten() { - if let Some(value) = value_option { - let table_name = TableCodec::decode_root_table(&value)?; - - tables.push(table_name); - } - } - - Ok(tables) + Ok(KipTransaction { + tx, + cache: ShardingLruCache::new(32, 16, RandomState::default())?, + }) } } pub struct KipTransaction { - table_codec: TableCodec, tx: mvcc::Transaction, + cache: ShardingLruCache, } #[async_trait] @@ -261,28 +62,35 @@ impl Transaction for KipTransaction { fn read( &self, + table_name: &String, bounds: Bounds, projections: Projections, ) -> Result, StorageError> { - let (min, max) = self.table_codec.tuple_bound(); + let all_columns = self + .table(table_name) + .ok_or(StorageError::TableNotFound)? + .all_columns(); + let (min, max) = TableCodec::tuple_bound(table_name); let iter = self.tx.iter(Bound::Included(&min), Bound::Included(&max))?; Ok(KipIter { offset: bounds.0.unwrap_or(0), limit: bounds.1, projections, - table_codec: &self.table_codec, + all_columns, iter, }) } fn read_by_index( &self, + table_name: &String, (offset_option, mut limit_option): Bounds, projections: Projections, index_meta: IndexMetaRef, binaries: Vec, ) -> Result, StorageError> { + let table = self.table(table_name).ok_or(StorageError::TableNotFound)?; let mut tuple_ids = Vec::new(); let mut offset = offset_option.unwrap_or(0); @@ -293,7 +101,7 @@ impl Transaction for KipTransaction { match binary { ConstantBinary::Scope { min, max } => { - let mut iter = self.scope_to_iter(&index_meta, min, max)?; + let mut iter = self.scope_to_iter(table_name, &index_meta, min, max)?; while let Some((_, value_option)) = iter.try_next()? { if let Some(value) = value_option { @@ -320,7 +128,7 @@ impl Transaction for KipTransaction { continue; } - let key = self.val_to_key(&index_meta, val)?; + let key = Self::val_to_key(table_name, &index_meta, val)?; if let Some(bytes) = self.tx.get(&key)? { tuple_ids.append(&mut TableCodec::decode_index(&bytes)?) @@ -334,7 +142,7 @@ impl Transaction for KipTransaction { Ok(IndexIter { projections, - table_codec: &self.table_codec, + table, tuple_ids: VecDeque::from(tuple_ids), tx: &self.tx, }) @@ -342,11 +150,12 @@ impl Transaction for KipTransaction { fn add_index( &mut self, + table_name: &String, index: Index, tuple_ids: Vec, is_unique: bool, ) -> Result<(), StorageError> { - let (key, value) = self.table_codec.encode_index(&index, &tuple_ids)?; + let (key, value) = TableCodec::encode_index(table_name, &index, &tuple_ids)?; if let Some(bytes) = self.tx.get(&key)? { if is_unique { @@ -367,16 +176,21 @@ impl Transaction for KipTransaction { Ok(()) } - fn del_index(&mut self, index: &Index) -> Result<(), StorageError> { - let key = self.table_codec.encode_index_key(&index)?; + fn del_index(&mut self, table_name: &String, index: &Index) -> Result<(), StorageError> { + let key = TableCodec::encode_index_key(table_name, &index)?; self.tx.remove(&key)?; Ok(()) } - fn append(&mut self, tuple: Tuple, is_overwrite: bool) -> Result<(), StorageError> { - let (key, value) = self.table_codec.encode_tuple(&tuple)?; + fn append( + &mut self, + table_name: &String, + tuple: Tuple, + is_overwrite: bool, + ) -> Result<(), StorageError> { + let (key, value) = TableCodec::encode_tuple(table_name, &tuple)?; if !is_overwrite && self.tx.get(&key)?.is_some() { return Err(StorageError::DuplicatePrimaryKey); @@ -386,13 +200,106 @@ impl Transaction for KipTransaction { Ok(()) } - fn delete(&mut self, tuple_id: TupleId) -> Result<(), StorageError> { - let key = self.table_codec.encode_tuple_key(&tuple_id)?; + fn delete(&mut self, table_name: &String, tuple_id: TupleId) -> Result<(), StorageError> { + let key = TableCodec::encode_tuple_key(table_name, &tuple_id)?; self.tx.remove(&key)?; Ok(()) } + fn create_table( + &mut self, + table_name: TableName, + columns: Vec, + ) -> Result { + let mut table_catalog = TableCatalog::new(table_name.clone(), columns)?; + + Self::create_index_meta_for_table(&mut self.tx, &mut table_catalog)?; + + for (_, column) in &table_catalog.columns { + let (key, value) = TableCodec::encode_column(column)?; + self.tx.set(key, value); + } + let (table_key, value) = TableCodec::encode_root_table(&table_name)?; + self.tx.set(table_key, value); + + self.cache.put(table_name.to_string(), table_catalog); + + Ok(table_name) + } + + fn drop_table(&mut self, table_name: &String) -> Result<(), StorageError> { + self.drop_data(table_name)?; + + let (min, max) = TableCodec::columns_bound(table_name); + let mut iter = self.tx.iter(Bound::Included(&min), Bound::Included(&max))?; + let mut col_keys = vec![]; + + while let Some((key, value_option)) = iter.try_next()? { + if value_option.is_some() { + col_keys.push(key); + } + } + drop(iter); + + for col_key in col_keys { + self.tx.remove(&col_key)? + } + self.tx + .remove(&TableCodec::encode_root_table_key(table_name))?; + + let _ = self.cache.remove(table_name); + + Ok(()) + } + + fn drop_data(&mut self, table_name: &String) -> Result<(), StorageError> { + let (tuple_min, tuple_max) = TableCodec::tuple_bound(table_name); + Self::_drop_data(&mut self.tx, &tuple_min, &tuple_max)?; + + let (index_min, index_max) = TableCodec::all_index_bound(table_name); + Self::_drop_data(&mut self.tx, &index_min, &index_max)?; + + Ok(()) + } + + fn table(&self, table_name: &String) -> Option<&TableCatalog> { + let mut option = self.cache.get(table_name); + + if option.is_none() { + // TODO: unify the data into a `Meta` prefix and use one iteration to collect all data + let (columns, name_option) = Self::column_collect(table_name, &self.tx).ok()?; + let indexes = Self::index_meta_collect(table_name, &self.tx)?; + + if let Some(catalog) = name_option.and_then(|table_name| { + TableCatalog::new_with_indexes(table_name, columns, indexes).ok() + }) { + option = self + .cache + .get_or_insert(table_name.to_string(), |_| Ok(catalog)) + .ok(); + } + } + + option + } + + fn show_tables(&self) -> Result, StorageError> { + let mut tables = vec![]; + let (min, max) = TableCodec::root_table_bound(); + let mut iter = self.tx.iter(Bound::Included(&min), Bound::Included(&max))?; + + while let Some((_, value_option)) = iter.try_next().ok().flatten() { + if let Some(value) = value_option { + let table_name = TableCodec::decode_root_table(&value)?; + + tables.push(table_name); + } + } + + Ok(tables) + } + async fn commit(self) -> Result<(), StorageError> { self.tx.commit().await?; @@ -401,22 +308,35 @@ impl Transaction for KipTransaction { } impl KipTransaction { - fn val_to_key(&self, index_meta: &IndexMetaRef, val: ValueRef) -> Result, TypeError> { + fn val_to_key( + table_name: &String, + index_meta: &IndexMetaRef, + val: ValueRef, + ) -> Result, TypeError> { let index = Index::new(index_meta.id, vec![val]); - self.table_codec.encode_index_key(&index) + TableCodec::encode_index_key(table_name, &index) } fn scope_to_iter( &self, + table_name: &String, index_meta: &IndexMetaRef, min: Bound, max: Bound, ) -> Result { let bound_encode = |bound: Bound| -> Result<_, StorageError> { match bound { - Bound::Included(val) => Ok(Bound::Included(self.val_to_key(&index_meta, val)?)), - Bound::Excluded(val) => Ok(Bound::Excluded(self.val_to_key(&index_meta, val)?)), + Bound::Included(val) => Ok(Bound::Included(Self::val_to_key( + table_name, + &index_meta, + val, + )?)), + Bound::Excluded(val) => Ok(Bound::Excluded(Self::val_to_key( + table_name, + &index_meta, + val, + )?)), Bound::Unbounded => Ok(Bound::Unbounded), } }; @@ -425,7 +345,7 @@ impl KipTransaction { let _ = mem::replace(value, Bound::Included(bound)); } }; - let (bound_min, bound_max) = self.table_codec.index_bound(&index_meta.id); + let (bound_min, bound_max) = TableCodec::index_bound(table_name, &index_meta.id); let mut encode_min = bound_encode(min)?; check_bound(&mut encode_min, bound_min); @@ -458,13 +378,103 @@ impl KipTransaction { false } + + fn column_collect( + name: &String, + tx: &mvcc::Transaction, + ) -> Result<(Vec, Option), StorageError> { + let (column_min, column_max) = TableCodec::columns_bound(name); + let mut column_iter = + tx.iter(Bound::Included(&column_min), Bound::Included(&column_max))?; + + let mut columns = vec![]; + let mut name_option = None; + + while let Some((_, value_option)) = column_iter.try_next().ok().flatten() { + if let Some(value) = value_option { + let (table_name, column) = TableCodec::decode_column(&value)?; + + if name != table_name.as_str() { + return Ok((vec![], None)); + } + let _ = name_option.insert(table_name); + + columns.push(column); + } + } + + Ok((columns, name_option)) + } + + fn index_meta_collect(name: &String, tx: &mvcc::Transaction) -> Option> { + let (index_min, index_max) = TableCodec::index_meta_bound(name); + let mut index_metas = vec![]; + let mut index_iter = tx + .iter(Bound::Included(&index_min), Bound::Included(&index_max)) + .ok()?; + + while let Some((_, value_option)) = index_iter.try_next().ok().flatten() { + if let Some(value) = value_option { + if let Some(index_meta) = TableCodec::decode_index_meta(&value).ok() { + index_metas.push(Arc::new(index_meta)); + } + } + } + + Some(index_metas) + } + + fn _drop_data(tx: &mut mvcc::Transaction, min: &[u8], max: &[u8]) -> Result<(), StorageError> { + let mut iter = tx.iter(Bound::Included(&min), Bound::Included(&max))?; + let mut data_keys = vec![]; + + while let Some((key, value_option)) = iter.try_next()? { + if value_option.is_some() { + data_keys.push(key); + } + } + drop(iter); + + for key in data_keys { + tx.remove(&key)? + } + + Ok(()) + } + + fn create_index_meta_for_table( + tx: &mut mvcc::Transaction, + table: &mut TableCatalog, + ) -> Result<(), StorageError> { + let table_name = table.name.clone(); + + for col in table + .all_columns() + .into_iter() + .filter(|col| col.desc.is_unique) + { + if let Some(col_id) = col.id { + let meta = IndexMeta { + id: 0, + column_ids: vec![col_id], + name: format!("uk_{}", col.name), + is_unique: true, + }; + let meta_ref = table.add_index_meta(meta); + let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?; + + tx.set(key, value); + } + } + Ok(()) + } } pub struct KipIter<'a> { offset: usize, limit: Option, projections: Projections, - table_codec: &'a TableCodec, + all_columns: Vec, iter: TransactionIter<'a>, } @@ -486,7 +496,7 @@ impl Iter for KipIter<'_> { let tuple = tuple_projection( &mut self.limit, &self.projections, - self.table_codec.decode_tuple(&value), + TableCodec::decode_tuple(self.all_columns.clone(), &value), )?; return Ok(Some(tuple)); @@ -504,9 +514,8 @@ mod test { use crate::expression::simplify::ConstantBinary; use crate::expression::ScalarExpression; use crate::storage::kip::KipStorage; - use crate::storage::memory::test::data_filling; - use crate::storage::table_codec::TableCodec; use crate::storage::{IndexIter, Iter, Storage, StorageError, Transaction}; + use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use itertools::Itertools; @@ -518,6 +527,7 @@ mod test { async fn test_in_kipdb_storage_works_with_data() -> Result<(), StorageError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = KipStorage::new(temp_dir.path()).await?; + let mut transaction = storage.transaction().await?; let columns = vec![ Arc::new(ColumnCatalog::new( "c1".to_string(), @@ -537,21 +547,42 @@ mod test { .iter() .map(|col_ref| ColumnCatalog::clone(&col_ref)) .collect_vec(); - let table_id = storage - .create_table(Arc::new("test".to_string()), source_columns) - .await?; + let _ = transaction.create_table(Arc::new("test".to_string()), source_columns)?; - let table_catalog = storage.table(&"test".to_string()).await; + let table_catalog = transaction.table(&"test".to_string()); assert!(table_catalog.is_some()); assert!(table_catalog .unwrap() .get_column_id_by_name(&"c1".to_string()) .is_some()); - let mut transaction = storage.transaction(&table_id).await.unwrap(); - data_filling(columns, &mut transaction)?; + transaction.append( + &"test".to_string(), + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(1)))), + columns: columns.clone(), + values: vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Boolean(Some(true))), + ], + }, + false, + )?; + transaction.append( + &"test".to_string(), + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(2)))), + columns, + values: vec![ + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Boolean(Some(false))), + ], + }, + false, + )?; let mut iter = transaction.read( + &"test".to_string(), (Some(1), Some(1)), vec![ScalarExpression::InputRef { index: 0, @@ -580,20 +611,14 @@ mod test { let _ = kipsql .run("insert into t1 (a) values (0), (1), (2)") .await?; + let transaction = kipsql.storage.transaction().await?; - let table = kipsql - .storage - .table(&"t1".to_string()) - .await - .unwrap() - .clone(); + let table = transaction.table(&"t1".to_string()).unwrap().clone(); let projections = table .all_columns() .into_iter() .map(|col| ScalarExpression::ColumnRef(col)) .collect_vec(); - let codec = TableCodec { table }; - let tx = kipsql.storage.transaction(&"t1".to_string()).await.unwrap(); let tuple_ids = vec![ Arc::new(DataValue::Int32(Some(0))), Arc::new(DataValue::Int32(Some(1))), @@ -601,9 +626,9 @@ mod test { ]; let mut iter = IndexIter { projections, - table_codec: &codec, + table: &table, tuple_ids: VecDeque::from(tuple_ids.clone()), - tx: &tx.tx, + tx: &transaction.tx, }; let mut result = Vec::new(); @@ -620,28 +645,23 @@ mod test { async fn test_read_by_index() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kipsql = Database::with_kipdb(temp_dir.path()).await?; - let _ = kipsql .run("create table t1 (a int primary key, b int unique)") .await?; let _ = kipsql .run("insert into t1 (a, b) values (0, 0), (1, 1), (2, 2)") .await?; + let transaction = kipsql.storage.transaction().await.unwrap(); - let table = kipsql - .storage - .table(&"t1".to_string()) - .await - .unwrap() - .clone(); + let table = transaction.table(&"t1".to_string()).unwrap().clone(); let projections = table .all_columns() .into_iter() .map(|col| ScalarExpression::ColumnRef(col)) .collect_vec(); - let transaction = kipsql.storage.transaction(&"t1".to_string()).await.unwrap(); let mut iter = transaction .read_by_index( + &"t1".to_string(), (Some(0), Some(1)), projections, table.indexes[0].clone(), diff --git a/src/storage/memory.rs b/src/storage/memory.rs deleted file mode 100644 index b1053523..00000000 --- a/src/storage/memory.rs +++ /dev/null @@ -1,323 +0,0 @@ -use crate::catalog::{ColumnCatalog, RootCatalog, TableCatalog, TableName}; -use crate::expression::simplify::ConstantBinary; -use crate::storage::{ - tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction, -}; -use crate::types::index::{Index, IndexMetaRef}; -use crate::types::tuple::{Tuple, TupleId}; -use async_trait::async_trait; -use std::cell::Cell; -use std::fmt::{Debug, Formatter}; -use std::slice; -use std::sync::Arc; - -// WARRING: Only single-threaded and tested using -#[derive(Clone)] -pub struct MemStorage { - inner: Arc>, -} - -unsafe impl Send for MemStorage {} - -unsafe impl Sync for MemStorage {} - -impl MemStorage { - pub fn new() -> MemStorage { - Self { - inner: Arc::new(Cell::new(StorageInner { - root: Default::default(), - tables: Default::default(), - })), - } - } - - pub fn root(self, root: RootCatalog) -> Self { - unsafe { - self.inner.as_ptr().as_mut().unwrap().root = root; - } - self - } -} - -#[derive(Debug)] -struct StorageInner { - root: RootCatalog, - tables: Vec<(TableName, MemTable)>, -} - -#[async_trait] -impl Storage for MemStorage { - type TransactionType = MemTable; - - async fn create_table( - &self, - table_name: TableName, - columns: Vec, - ) -> Result { - let new_table = MemTable { - tuples: Arc::new(Cell::new(vec![])), - }; - let inner = unsafe { self.inner.as_ptr().as_mut() }.unwrap(); - - let table_id = inner.root.add_table(table_name.clone(), columns)?; - inner.tables.push((table_name, new_table)); - - Ok(table_id) - } - - async fn drop_table(&self, name: &String) -> Result<(), StorageError> { - let inner = unsafe { self.inner.as_ptr().as_mut().unwrap() }; - - inner.root.drop_table(&name)?; - - Ok(()) - } - - async fn drop_data(&self, name: &String) -> Result<(), StorageError> { - let inner = unsafe { self.inner.as_ptr().as_mut().unwrap() }; - - inner.tables.retain(|(t_name, _)| t_name.as_str() != name); - - Ok(()) - } - - async fn transaction(&self, name: &String) -> Option { - unsafe { - self.inner - .as_ptr() - .as_ref() - .unwrap() - .tables - .iter() - .find(|(tname, _)| tname.as_str() == name) - .map(|(_, table)| table.clone()) - } - } - - async fn table(&self, name: &String) -> Option<&TableCatalog> { - unsafe { self.inner.as_ptr().as_ref().unwrap().root.get_table(name) } - } - - async fn show_tables(&self) -> Result, StorageError> { - todo!() - } -} - -unsafe impl Send for MemTable {} - -unsafe impl Sync for MemTable {} - -#[derive(Clone)] -pub struct MemTable { - tuples: Arc>>, -} - -impl Debug for MemTable { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - unsafe { - f.debug_struct("MemTable") - .field("{:?}", self.tuples.as_ptr().as_ref().unwrap()) - .finish() - } - } -} - -#[async_trait] -impl Transaction for MemTable { - type IterType<'a> = MemTraction<'a>; - - fn read( - &self, - bounds: Bounds, - projection: Projections, - ) -> Result, StorageError> { - unsafe { - Ok(MemTraction { - offset: bounds.0.unwrap_or(0), - limit: bounds.1, - projections: projection, - iter: self.tuples.as_ptr().as_ref().unwrap().iter(), - }) - } - } - - #[allow(unused_variables)] - fn read_by_index( - &self, - bounds: Bounds, - projection: Projections, - index_meta: IndexMetaRef, - binaries: Vec, - ) -> Result, StorageError> { - todo!() - } - - #[allow(unused_variables)] - fn add_index( - &mut self, - index: Index, - tuple_ids: Vec, - is_unique: bool, - ) -> Result<(), StorageError> { - todo!() - } - - fn del_index(&mut self, _index: &Index) -> Result<(), StorageError> { - todo!() - } - - fn append(&mut self, tuple: Tuple, is_overwrite: bool) -> Result<(), StorageError> { - let tuples = unsafe { self.tuples.as_ptr().as_mut() }.unwrap(); - - if let Some(original_tuple) = tuples.iter_mut().find(|t| t.id == tuple.id) { - if !is_overwrite { - return Err(StorageError::DuplicatePrimaryKey); - } - *original_tuple = tuple; - } else { - tuples.push(tuple); - } - - Ok(()) - } - - fn delete(&mut self, tuple_id: TupleId) -> Result<(), StorageError> { - let tuples = unsafe { self.tuples.as_ptr().as_mut() }.unwrap(); - - tuples.retain(|tuple| tuple.id.clone().unwrap() != tuple_id); - - Ok(()) - } - - async fn commit(self) -> Result<(), StorageError> { - Ok(()) - } -} - -pub struct MemTraction<'a> { - offset: usize, - limit: Option, - projections: Projections, - iter: slice::Iter<'a, Tuple>, -} - -impl Iter for MemTraction<'_> { - fn next_tuple(&mut self) -> Result, StorageError> { - while self.offset > 0 { - let _ = self.iter.next(); - self.offset -= 1; - } - - if let Some(num) = self.limit { - if num == 0 { - return Ok(None); - } - } - - self.iter - .next() - .cloned() - .map(|tuple| tuple_projection(&mut self.limit, &self.projections, tuple)) - .transpose() - } -} - -#[cfg(test)] -pub(crate) mod test { - use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; - use crate::expression::ScalarExpression; - use crate::storage::memory::MemStorage; - use crate::storage::{Iter, Storage, StorageError, Transaction}; - use crate::types::tuple::Tuple; - use crate::types::value::DataValue; - use crate::types::LogicalType; - use itertools::Itertools; - use std::sync::Arc; - - pub fn data_filling( - columns: Vec, - table: &mut impl Transaction, - ) -> Result<(), StorageError> { - table.append( - Tuple { - id: Some(Arc::new(DataValue::Int32(Some(1)))), - columns: columns.clone(), - values: vec![ - Arc::new(DataValue::Int32(Some(1))), - Arc::new(DataValue::Boolean(Some(true))), - ], - }, - false, - )?; - table.append( - Tuple { - id: Some(Arc::new(DataValue::Int32(Some(2)))), - columns: columns.clone(), - values: vec![ - Arc::new(DataValue::Int32(Some(2))), - Arc::new(DataValue::Boolean(Some(false))), - ], - }, - false, - )?; - - Ok(()) - } - - #[tokio::test] - async fn test_in_memory_storage_works_with_data() -> Result<(), StorageError> { - let storage = MemStorage::new(); - let columns = vec![ - Arc::new(ColumnCatalog::new( - "c1".to_string(), - false, - ColumnDesc::new(LogicalType::Integer, true, false), - None, - )), - Arc::new(ColumnCatalog::new( - "c2".to_string(), - false, - ColumnDesc::new(LogicalType::Boolean, false, false), - None, - )), - ]; - - let source_columns = columns - .iter() - .map(|col_ref| ColumnCatalog::clone(&col_ref)) - .collect_vec(); - - let table_id = storage - .create_table(Arc::new("test".to_string()), source_columns) - .await?; - - let table_catalog = storage.table(&"test".to_string()).await; - assert!(table_catalog.is_some()); - assert!(table_catalog - .unwrap() - .get_column_id_by_name(&"c1".to_string()) - .is_some()); - - let mut transaction = storage.transaction(&table_id).await.unwrap(); - data_filling(columns, &mut transaction)?; - - let mut iter = transaction.read( - (Some(1), Some(1)), - vec![ScalarExpression::InputRef { - index: 0, - ty: LogicalType::Integer, - }], - )?; - - let option_1 = iter.next_tuple()?; - assert_eq!( - option_1.unwrap().id, - Some(Arc::new(DataValue::Int32(Some(2)))) - ); - - let option_2 = iter.next_tuple()?; - assert_eq!(option_2, None); - - Ok(()) - } -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a87eef1e..58d37a9f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,5 +1,4 @@ pub mod kip; -pub mod memory; mod table_codec; use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog, TableName}; @@ -20,19 +19,7 @@ use std::ops::SubAssign; pub trait Storage: Sync + Send + Clone + 'static { type TransactionType: Transaction; - async fn create_table( - &self, - table_name: TableName, - columns: Vec, - ) -> Result; - - async fn drop_table(&self, name: &String) -> Result<(), StorageError>; - async fn drop_data(&self, name: &String) -> Result<(), StorageError>; - - async fn transaction(&self, name: &String) -> Option; - async fn table(&self, name: &String) -> Option<&TableCatalog>; - - async fn show_tables(&self) -> Result, StorageError>; + async fn transaction(&self) -> Result; } /// Optional bounds of the reader, of the form (offset, limit). @@ -48,12 +35,14 @@ pub trait Transaction: Sync + Send + 'static { /// The projections is column indices. fn read( &self, + table_name: &String, bounds: Bounds, projection: Projections, ) -> Result, StorageError>; fn read_by_index( &self, + table_name: &String, bounds: Bounds, projection: Projections, index_meta: IndexMetaRef, @@ -62,16 +51,34 @@ pub trait Transaction: Sync + Send + 'static { fn add_index( &mut self, + table_name: &String, index: Index, tuple_ids: Vec, is_unique: bool, ) -> Result<(), StorageError>; - fn del_index(&mut self, index: &Index) -> Result<(), StorageError>; + fn del_index(&mut self, table_name: &String, index: &Index) -> Result<(), StorageError>; + + fn append( + &mut self, + table_name: &String, + tuple: Tuple, + is_overwrite: bool, + ) -> Result<(), StorageError>; + + fn delete(&mut self, table_name: &String, tuple_id: TupleId) -> Result<(), StorageError>; - fn append(&mut self, tuple: Tuple, is_overwrite: bool) -> Result<(), StorageError>; + fn create_table( + &mut self, + table_name: TableName, + columns: Vec, + ) -> Result; + + fn drop_table(&mut self, table_name: &String) -> Result<(), StorageError>; + fn drop_data(&mut self, table_name: &String) -> Result<(), StorageError>; + fn table(&self, table_name: &String) -> Option<&TableCatalog>; - fn delete(&mut self, tuple_id: TupleId) -> Result<(), StorageError>; + fn show_tables(&self) -> Result, StorageError>; async fn commit(self) -> Result<(), StorageError>; } @@ -79,7 +86,7 @@ pub trait Transaction: Sync + Send + 'static { // TODO: Table return optimization pub struct IndexIter<'a> { projections: Projections, - table_codec: &'a TableCodec, + table: &'a TableCatalog, tuple_ids: VecDeque, tx: &'a mvcc::Transaction, } @@ -87,17 +94,15 @@ pub struct IndexIter<'a> { impl Iter for IndexIter<'_> { fn next_tuple(&mut self) -> Result, StorageError> { if let Some(tuple_id) = self.tuple_ids.pop_front() { - let key = self.table_codec.encode_tuple_key(&tuple_id)?; + let key = TableCodec::encode_tuple_key(&self.table.name, &tuple_id)?; Ok(self .tx .get(&key)? .map(|bytes| { - tuple_projection( - &mut None, - &self.projections, - self.table_codec.decode_tuple(&bytes), - ) + let tuple = TableCodec::decode_tuple(self.table.all_columns(), &bytes); + + tuple_projection(&mut None, &self.projections, tuple) }) .transpose()?) } else { @@ -154,6 +159,9 @@ pub enum StorageError { #[error("The column has been declared unique and the value already exists")] DuplicateUniqueValue, + + #[error("The table not found")] + TableNotFound, } impl From for StorageError { diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index a2819dfc..d03beb01 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -1,4 +1,4 @@ -use crate::catalog::{ColumnCatalog, TableCatalog, TableName}; +use crate::catalog::{ColumnCatalog, ColumnRef, TableName}; use crate::types::errors::TypeError; use crate::types::index::{Index, IndexId, IndexMeta}; use crate::types::tuple::{Tuple, TupleId}; @@ -12,9 +12,7 @@ lazy_static! { } #[derive(Clone)] -pub struct TableCodec { - pub table: TableCatalog, -} +pub struct TableCodec {} #[derive(Copy, Clone)] enum CodecType { @@ -57,9 +55,9 @@ impl TableCodec { table_bytes } - pub fn tuple_bound(&self) -> (Vec, Vec) { + pub fn tuple_bound(name: &String) -> (Vec, Vec) { let op = |bound_id| { - let mut key_prefix = Self::key_prefix(CodecType::Tuple, &self.table.name); + let mut key_prefix = Self::key_prefix(CodecType::Tuple, name); key_prefix.push(bound_id); key_prefix @@ -79,9 +77,9 @@ impl TableCodec { (op(BOUND_MIN_TAG), op(BOUND_MAX_TAG)) } - pub fn index_bound(&self, index_id: &IndexId) -> (Vec, Vec) { + pub fn index_bound(name: &String, index_id: &IndexId) -> (Vec, Vec) { let op = |bound_id| { - let mut key_prefix = Self::key_prefix(CodecType::Index, &self.table.name); + let mut key_prefix = Self::key_prefix(CodecType::Index, name); key_prefix.push(BOUND_MIN_TAG); key_prefix.append(&mut index_id.to_be_bytes().to_vec()); @@ -92,9 +90,9 @@ impl TableCodec { (op(BOUND_MIN_TAG), op(BOUND_MAX_TAG)) } - pub fn all_index_bound(&self) -> (Vec, Vec) { + pub fn all_index_bound(name: &String) -> (Vec, Vec) { let op = |bound_id| { - let mut key_prefix = Self::key_prefix(CodecType::Index, &self.table.name); + let mut key_prefix = Self::key_prefix(CodecType::Index, name); key_prefix.push(bound_id); key_prefix @@ -116,7 +114,7 @@ impl TableCodec { pub fn columns_bound(name: &String) -> (Vec, Vec) { let op = |bound_id| { - let mut key_prefix = Self::key_prefix(CodecType::Column, &name); + let mut key_prefix = Self::key_prefix(CodecType::Column, name); key_prefix.push(bound_id); key_prefix @@ -127,15 +125,15 @@ impl TableCodec { /// Key: TableName_Tuple_0_RowID(Sorted) /// Value: Tuple - pub fn encode_tuple(&self, tuple: &Tuple) -> Result<(Bytes, Bytes), TypeError> { - let tuple_id = tuple.id.clone().ok_or(TypeError::NotNull)?; - let key = self.encode_tuple_key(&tuple_id)?; + pub fn encode_tuple(name: &String, tuple: &Tuple) -> Result<(Bytes, Bytes), TypeError> { + let tuple_id = tuple.id.clone().ok_or(TypeError::PrimaryKeyNotFound)?; + let key = Self::encode_tuple_key(name, &tuple_id)?; Ok((Bytes::from(key), Bytes::from(tuple.serialize_to()))) } - pub fn encode_tuple_key(&self, tuple_id: &TupleId) -> Result, TypeError> { - let mut key_prefix = Self::key_prefix(CodecType::Tuple, &self.table.name); + pub fn encode_tuple_key(name: &String, tuple_id: &TupleId) -> Result, TypeError> { + let mut key_prefix = Self::key_prefix(CodecType::Tuple, name); key_prefix.push(BOUND_MIN_TAG); tuple_id.to_primary_key(&mut key_prefix)?; @@ -143,8 +141,8 @@ impl TableCodec { Ok(key_prefix) } - pub fn decode_tuple(&self, bytes: &[u8]) -> Tuple { - Tuple::deserialize_from(self.table.all_columns(), bytes) + pub fn decode_tuple(columns: Vec, bytes: &[u8]) -> Tuple { + Tuple::deserialize_from(columns, bytes) } /// Key: TableName_IndexMeta_0_IndexID @@ -178,11 +176,11 @@ impl TableCodec { /// Tips: The unique index has only one ColumnID and one corresponding DataValue, /// so it can be positioned directly. pub fn encode_index( - &self, + name: &String, index: &Index, tuple_ids: &[TupleId], ) -> Result<(Bytes, Bytes), TypeError> { - let key = self.encode_index_key(index)?; + let key = TableCodec::encode_index_key(name, index)?; Ok(( Bytes::from(key), @@ -190,8 +188,8 @@ impl TableCodec { )) } - pub fn encode_index_key(&self, index: &Index) -> Result, TypeError> { - let mut key_prefix = Self::key_prefix(CodecType::Index, &self.table.name); + pub fn encode_index_key(name: &String, index: &Index) -> Result, TypeError> { + let mut key_prefix = Self::key_prefix(CodecType::Index, name); key_prefix.push(BOUND_MIN_TAG); key_prefix.append(&mut index.id.to_be_bytes().to_vec()); key_prefix.push(BOUND_MIN_TAG); @@ -263,7 +261,7 @@ mod tests { use std::ops::Bound; use std::sync::Arc; - fn build_table_codec() -> (TableCatalog, TableCodec) { + fn build_table_codec() -> TableCatalog { let columns = vec![ ColumnCatalog::new( "c1".into(), @@ -278,16 +276,12 @@ mod tests { None, ), ]; - let table_catalog = TableCatalog::new(Arc::new("t1".to_string()), columns).unwrap(); - let codec = TableCodec { - table: table_catalog.clone(), - }; - (table_catalog, codec) + TableCatalog::new(Arc::new("t1".to_string()), columns).unwrap() } #[test] fn test_table_codec_tuple() -> Result<(), TypeError> { - let (table_catalog, codec) = build_table_codec(); + let table_catalog = build_table_codec(); let tuple = Tuple { id: Some(Arc::new(DataValue::Int32(Some(0)))), @@ -297,16 +291,19 @@ mod tests { Arc::new(DataValue::Decimal(Some(Decimal::new(1, 0)))), ], }; - let (_, bytes) = codec.encode_tuple(&tuple)?; + let (_, bytes) = TableCodec::encode_tuple(&table_catalog.name, &tuple)?; - assert_eq!(codec.decode_tuple(&bytes), tuple); + assert_eq!( + TableCodec::decode_tuple(table_catalog.all_columns(), &bytes), + tuple + ); Ok(()) } #[test] fn test_root_catalog() { - let (table_catalog, _) = build_table_codec(); + let table_catalog = build_table_codec(); let (_, bytes) = TableCodec::encode_root_table(&table_catalog.name).unwrap(); let table_name = TableCodec::decode_root_table(&bytes).unwrap(); @@ -331,14 +328,14 @@ mod tests { #[test] fn test_table_codec_index() -> Result<(), TypeError> { - let (_, codec) = build_table_codec(); + let table_catalog = build_table_codec(); let index = Index { id: 0, column_values: vec![Arc::new(DataValue::Int32(Some(0)))], }; let tuple_ids = vec![Arc::new(DataValue::Int32(Some(0)))]; - let (_, bytes) = codec.encode_index(&index, &tuple_ids)?; + let (_, bytes) = TableCodec::encode_index(&table_catalog.name, &index, &tuple_ids)?; assert_eq!(TableCodec::decode_index(&bytes)?, tuple_ids); @@ -347,7 +344,7 @@ mod tests { #[test] fn test_table_codec_column() { - let (table_catalog, _) = build_table_codec(); + let table_catalog = build_table_codec(); let col = table_catalog.all_columns()[0].clone(); let (_, bytes) = TableCodec::encode_column(&col).unwrap(); @@ -454,34 +451,32 @@ mod tests { #[test] fn test_table_codec_index_bound() { let mut set = BTreeSet::new(); - let table_codec = TableCodec { - table: TableCatalog::new(Arc::new("T0".to_string()), vec![]).unwrap(), - }; + let table_catalog = TableCatalog::new(Arc::new("T0".to_string()), vec![]).unwrap(); - let op = |value: DataValue, index_id: usize, table_codec: &TableCodec| { + let op = |value: DataValue, index_id: usize, table_name: &String| { let index = Index { id: index_id as u32, column_values: vec![Arc::new(value)], }; - table_codec.encode_index_key(&index).unwrap() + TableCodec::encode_index_key(table_name, &index).unwrap() }; - set.insert(op(DataValue::Int32(Some(0)), 0, &table_codec)); - set.insert(op(DataValue::Int32(Some(1)), 0, &table_codec)); - set.insert(op(DataValue::Int32(Some(2)), 0, &table_codec)); + set.insert(op(DataValue::Int32(Some(0)), 0, &table_catalog.name)); + set.insert(op(DataValue::Int32(Some(1)), 0, &table_catalog.name)); + set.insert(op(DataValue::Int32(Some(2)), 0, &table_catalog.name)); - set.insert(op(DataValue::Int32(Some(0)), 1, &table_codec)); - set.insert(op(DataValue::Int32(Some(1)), 1, &table_codec)); - set.insert(op(DataValue::Int32(Some(2)), 1, &table_codec)); + set.insert(op(DataValue::Int32(Some(0)), 1, &table_catalog.name)); + set.insert(op(DataValue::Int32(Some(1)), 1, &table_catalog.name)); + set.insert(op(DataValue::Int32(Some(2)), 1, &table_catalog.name)); - set.insert(op(DataValue::Int32(Some(0)), 2, &table_codec)); - set.insert(op(DataValue::Int32(Some(1)), 2, &table_codec)); - set.insert(op(DataValue::Int32(Some(2)), 2, &table_codec)); + set.insert(op(DataValue::Int32(Some(0)), 2, &table_catalog.name)); + set.insert(op(DataValue::Int32(Some(1)), 2, &table_catalog.name)); + set.insert(op(DataValue::Int32(Some(2)), 2, &table_catalog.name)); println!("{:#?}", set); - let (min, max) = table_codec.index_bound(&1); + let (min, max) = TableCodec::index_bound(&table_catalog.name, &1); println!("{:?}", min); println!("{:?}", max); @@ -495,9 +490,18 @@ mod tests { assert_eq!(vec.len(), 3); - assert_eq!(vec[0], &op(DataValue::Int32(Some(0)), 1, &table_codec)); - assert_eq!(vec[1], &op(DataValue::Int32(Some(1)), 1, &table_codec)); - assert_eq!(vec[2], &op(DataValue::Int32(Some(2)), 1, &table_codec)); + assert_eq!( + vec[0], + &op(DataValue::Int32(Some(0)), 1, &table_catalog.name) + ); + assert_eq!( + vec[1], + &op(DataValue::Int32(Some(1)), 1, &table_catalog.name) + ); + assert_eq!( + vec[2], + &op(DataValue::Int32(Some(2)), 1, &table_catalog.name) + ); } #[test] @@ -509,11 +513,7 @@ mod tests { column_values: vec![Arc::new(value)], }; - TableCodec { - table: TableCatalog::new(Arc::new(table_name.to_string()), vec![]).unwrap(), - } - .encode_index_key(&index) - .unwrap() + TableCodec::encode_index_key(&table_name.to_string(), &index).unwrap() }; set.insert(op(DataValue::Int32(Some(0)), 0, "T0")); @@ -528,10 +528,7 @@ mod tests { set.insert(op(DataValue::Int32(Some(1)), 0, "T2")); set.insert(op(DataValue::Int32(Some(2)), 0, "T2")); - let table_codec = TableCodec { - table: TableCatalog::new(Arc::new("T1".to_string()), vec![]).unwrap(), - }; - let (min, max) = table_codec.all_index_bound(); + let (min, max) = TableCodec::all_index_bound(&"T1".to_string()); let vec = set .range::, (Bound<&Vec>, Bound<&Vec>)>(( @@ -551,11 +548,7 @@ mod tests { fn test_table_codec_tuple_bound() { let mut set = BTreeSet::new(); let op = |tuple_id: DataValue, table_name: &str| { - TableCodec { - table: TableCatalog::new(Arc::new(table_name.to_string()), vec![]).unwrap(), - } - .encode_tuple_key(&Arc::new(tuple_id)) - .unwrap() + TableCodec::encode_tuple_key(&table_name.to_string(), &Arc::new(tuple_id)).unwrap() }; set.insert(op(DataValue::Int32(Some(0)), "T0")); @@ -570,10 +563,7 @@ mod tests { set.insert(op(DataValue::Int32(Some(1)), "T2")); set.insert(op(DataValue::Int32(Some(2)), "T2")); - let table_codec = TableCodec { - table: TableCatalog::new(Arc::new("T1".to_string()), vec![]).unwrap(), - }; - let (min, max) = table_codec.tuple_bound(); + let (min, max) = TableCodec::tuple_bound(&"T1".to_string()); let vec = set .range::, (Bound<&Vec>, Bound<&Vec>)>(( diff --git a/src/types/value.rs b/src/types/value.rs index aa8e32a5..c6284b54 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -521,7 +521,7 @@ impl DataValue { DataValue::Utf8(Some(v)) => Self::encode_bytes(b, v.as_bytes()), value => { return if value.is_null() { - Err(TypeError::NotNull) + Err(TypeError::PrimaryKeyNotFound) } else { Err(TypeError::InvalidType) }