diff --git a/Cargo.toml b/Cargo.toml index 5f854b1c..3c2ef149 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "fnck_sql" -version = "0.0.1-alpha.14" +version = "0.0.1-alpha.15" edition = "2021" authors = ["Kould ", "Xwg "] description = "SQL as a Function for Rust" diff --git a/src/bin/server.rs b/src/bin/server.rs index 4c8450b8..1e23dc89 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use clap::Parser; use fnck_sql::db::{DBTransaction, DataBaseBuilder, Database}; use fnck_sql::errors::DatabaseError; -use fnck_sql::storage::kip::KipStorage; +use fnck_sql::storage::kipdb::KipStorage; use fnck_sql::types::tuple::{Schema, Tuple}; use fnck_sql::types::LogicalType; use futures::stream; diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 9af59f0f..cd54afc0 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -143,7 +143,7 @@ mod tests { use super::*; use crate::binder::BinderContext; use crate::catalog::ColumnDesc; - use crate::storage::kip::KipStorage; + use crate::storage::kipdb::KipStorage; use crate::storage::Storage; use crate::types::LogicalType; use sqlparser::ast::CharLengthUnits; diff --git a/src/binder/mod.rs b/src/binder/mod.rs index cad6ed08..d3221421 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -352,7 +352,7 @@ pub mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; use crate::errors::DatabaseError; use crate::planner::LogicalPlan; - use crate::storage::kip::KipStorage; + use crate::storage::kipdb::KipStorage; use crate::storage::{Storage, Transaction}; use crate::types::LogicalType::Integer; use std::path::PathBuf; diff --git a/src/db.rs b/src/db.rs index 3d243a2c..982c57c9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -15,7 +15,7 @@ use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::parser::parse_sql; use crate::planner::LogicalPlan; -use crate::storage::kip::KipStorage; +use crate::storage::kipdb::KipStorage; use crate::storage::{Storage, Transaction}; use crate::types::tuple::{SchemaRef, Tuple}; use crate::udf::current_date::CurrentDate; diff --git a/src/execution/volcano/dql/aggregate/hash_agg.rs b/src/execution/volcano/dql/aggregate/hash_agg.rs index 90a63dc4..8b4a5550 100644 --- a/src/execution/volcano/dql/aggregate/hash_agg.rs +++ b/src/execution/volcano/dql/aggregate/hash_agg.rs @@ -165,7 +165,7 @@ mod test { use crate::planner::operator::values::ValuesOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; - use crate::storage::kip::KipStorage; + use crate::storage::kipdb::KipStorage; use crate::storage::Storage; use crate::types::tuple::create_table; use crate::types::value::DataValue; diff --git a/src/execution/volcano/dql/join/hash_join.rs b/src/execution/volcano/dql/join/hash_join.rs index 9840d865..1d5b1435 100644 --- a/src/execution/volcano/dql/join/hash_join.rs +++ b/src/execution/volcano/dql/join/hash_join.rs @@ -383,7 +383,7 @@ mod test { use crate::planner::operator::values::ValuesOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; - use crate::storage::kip::KipStorage; + use crate::storage::kipdb::KipStorage; use crate::storage::Storage; use crate::types::value::DataValue; use crate::types::LogicalType; diff --git a/src/execution/volcano/dql/join/nested_loop_join.rs b/src/execution/volcano/dql/join/nested_loop_join.rs index 01baf31d..21dce92a 100644 --- a/src/execution/volcano/dql/join/nested_loop_join.rs +++ b/src/execution/volcano/dql/join/nested_loop_join.rs @@ -328,7 +328,7 @@ mod test { use crate::expression::ScalarExpression; use crate::planner::operator::values::ValuesOperator; use crate::planner::operator::Operator; - use crate::storage::kip::KipStorage; + use crate::storage::kipdb::KipStorage; use crate::storage::Storage; use crate::types::value::DataValue; use crate::types::LogicalType; diff --git a/src/expression/range_detacher.rs b/src/expression/range_detacher.rs index e1c3acaf..10039e0a 100644 --- a/src/expression/range_detacher.rs +++ b/src/expression/range_detacher.rs @@ -792,7 +792,7 @@ mod test { use crate::planner::operator::filter::FilterOperator; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; use crate::types::value::DataValue; use std::ops::Bound; use std::sync::Arc; diff --git a/src/optimizer/core/memo.rs b/src/optimizer/core/memo.rs index fea4276b..cf926398 100644 --- a/src/optimizer/core/memo.rs +++ b/src/optimizer/core/memo.rs @@ -91,7 +91,7 @@ mod tests { use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::planner::operator::PhysicalOption; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; use crate::storage::{Storage, Transaction}; use crate::types::index::{IndexInfo, IndexMeta, IndexType}; use crate::types::value::DataValue; diff --git a/src/optimizer/core/statistics_meta.rs b/src/optimizer/core/statistics_meta.rs index fdfe6e7d..2b941e78 100644 --- a/src/optimizer/core/statistics_meta.rs +++ b/src/optimizer/core/statistics_meta.rs @@ -3,8 +3,7 @@ use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; use crate::optimizer::core::cm_sketch::CountMinSketch; use crate::optimizer::core::histogram::Histogram; -use crate::storage::kip::StatisticsMetaCache; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, Transaction}; use crate::types::index::IndexId; use crate::types::value::DataValue; use serde::{Deserialize, Serialize}; diff --git a/src/optimizer/rule/normalization/column_pruning.rs b/src/optimizer/rule/normalization/column_pruning.rs index d86d16ee..ce9ad9ad 100644 --- a/src/optimizer/rule/normalization/column_pruning.rs +++ b/src/optimizer/rule/normalization/column_pruning.rs @@ -200,7 +200,7 @@ mod tests { use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::Operator; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; #[tokio::test] async fn test_column_pruning() -> Result<(), DatabaseError> { diff --git a/src/optimizer/rule/normalization/combine_operators.rs b/src/optimizer/rule/normalization/combine_operators.rs index 2b59760b..5e63c07a 100644 --- a/src/optimizer/rule/normalization/combine_operators.rs +++ b/src/optimizer/rule/normalization/combine_operators.rs @@ -153,7 +153,7 @@ mod tests { use crate::optimizer::heuristic::optimizer::HepOptimizer; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::planner::operator::Operator; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; use crate::types::value::DataValue; use crate::types::LogicalType; use std::sync::Arc; diff --git a/src/optimizer/rule/normalization/pushdown_limit.rs b/src/optimizer/rule/normalization/pushdown_limit.rs index 5df0beb0..b39f711a 100644 --- a/src/optimizer/rule/normalization/pushdown_limit.rs +++ b/src/optimizer/rule/normalization/pushdown_limit.rs @@ -192,7 +192,7 @@ mod tests { use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::planner::operator::limit::LimitOperator; use crate::planner::operator::Operator; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; #[tokio::test] async fn test_limit_project_transpose() -> Result<(), DatabaseError> { diff --git a/src/optimizer/rule/normalization/pushdown_predicates.rs b/src/optimizer/rule/normalization/pushdown_predicates.rs index dce4b24f..b1c6e34a 100644 --- a/src/optimizer/rule/normalization/pushdown_predicates.rs +++ b/src/optimizer/rule/normalization/pushdown_predicates.rs @@ -272,7 +272,7 @@ mod tests { use crate::optimizer::heuristic::optimizer::HepOptimizer; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::planner::operator::Operator; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; use crate::types::value::DataValue; use crate::types::LogicalType; use std::collections::Bound; diff --git a/src/optimizer/rule/normalization/simplification.rs b/src/optimizer/rule/normalization/simplification.rs index c4fbf138..04d077fe 100644 --- a/src/optimizer/rule/normalization/simplification.rs +++ b/src/optimizer/rule/normalization/simplification.rs @@ -121,7 +121,7 @@ mod test { use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; - use crate::storage::kip::KipTransaction; + use crate::storage::kipdb::KipTransaction; use crate::types::value::DataValue; use crate::types::{ColumnId, LogicalType}; use std::collections::Bound; diff --git a/src/storage/kip.rs b/src/storage/kip.rs deleted file mode 100644 index ddf4263a..00000000 --- a/src/storage/kip.rs +++ /dev/null @@ -1,763 +0,0 @@ -use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; -use crate::errors::DatabaseError; -use crate::expression::range_detacher::Range; -use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; -use crate::storage::table_codec::TableCodec; -use crate::storage::{ - Bounds, IndexImplEnum, IndexImplParams, IndexIter, Iter, Storage, Transaction, -}; -use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType}; -use crate::types::tuple::{Tuple, TupleId}; -use crate::types::{ColumnId, LogicalType}; -use itertools::Itertools; -use kip_db::kernel::lsm::iterator::Iter as KipDBIter; -use kip_db::kernel::lsm::mvcc::{CheckType, TransactionIter}; -use kip_db::kernel::lsm::storage::Config; -use kip_db::kernel::lsm::{mvcc, storage}; -use kip_db::kernel::utils::lru_cache::ShardingLruCache; -use std::collections::hash_map::RandomState; -use std::collections::{Bound, VecDeque}; -use std::ops::SubAssign; -use std::path::PathBuf; -use std::sync::Arc; - -#[derive(Clone)] -pub struct KipStorage { - pub inner: Arc, - pub(crate) meta_cache: Arc, - pub(crate) table_cache: Arc>, -} - -impl KipStorage { - pub async fn new(path: impl Into + Send) -> Result { - let storage = - storage::KipStorage::open_with_config(Config::new(path).enable_level_0_memorization()) - .await?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); - - Ok(KipStorage { - inner: Arc::new(storage), - meta_cache, - table_cache, - }) - } -} - -impl Storage for KipStorage { - type TransactionType = KipTransaction; - - async fn transaction(&self) -> Result { - let tx = self.inner.new_transaction(CheckType::Optimistic).await; - - Ok(KipTransaction { - tx, - table_cache: Arc::clone(&self.table_cache), - meta_cache: self.meta_cache.clone(), - }) - } -} - -pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), StatisticsMeta>; - -pub struct KipTransaction { - tx: mvcc::Transaction, - table_cache: Arc>, - meta_cache: Arc, -} - -impl Transaction for KipTransaction { - type IterType<'a> = KipIter<'a>; - - fn read( - &self, - table_name: TableName, - bounds: Bounds, - mut columns: Vec<(usize, ColumnRef)>, - ) -> Result, DatabaseError> { - assert!(columns.is_sorted_by_key(|(i, _)| i)); - assert!(columns.iter().map(|(i, _)| i).all_unique()); - - let table = self - .table(table_name.clone()) - .ok_or(DatabaseError::TableNotFound)?; - let table_types = table.types(); - if columns.is_empty() { - let (i, column) = table.primary_key()?; - columns.push((i, column.clone())); - } - let mut tuple_columns = Vec::with_capacity(columns.len()); - let mut projections = Vec::with_capacity(columns.len()); - for (projection, column) in columns { - tuple_columns.push(column); - projections.push(projection); - } - - let (min, max) = TableCodec::tuple_bound(&table_name); - let iter = self.tx.iter(Bound::Included(&min), Bound::Included(&max))?; - - Ok(KipIter { - offset: bounds.0.unwrap_or(0), - limit: bounds.1, - table_types, - tuple_columns: Arc::new(tuple_columns), - projections, - iter, - }) - } - - fn read_by_index( - &self, - table_name: TableName, - (offset_option, limit_option): Bounds, - columns: Vec<(usize, ColumnRef)>, - index_meta: IndexMetaRef, - ranges: Vec, - ) -> Result, DatabaseError> { - assert!(columns.is_sorted_by_key(|(i, _)| i)); - assert!(columns.iter().map(|(i, _)| i).all_unique()); - - let table = self - .table(table_name.clone()) - .ok_or(DatabaseError::TableNotFound)?; - let table_types = table.types(); - let table_name = table.name.as_str(); - let offset = offset_option.unwrap_or(0); - - let mut tuple_columns = Vec::with_capacity(columns.len()); - let mut projections = Vec::with_capacity(columns.len()); - for (projection, column) in columns { - tuple_columns.push(column); - projections.push(projection); - } - let inner = IndexImplEnum::instance(index_meta.ty); - - Ok(IndexIter { - offset, - limit: limit_option, - params: IndexImplParams { - tuple_schema_ref: Arc::new(tuple_columns), - projections, - index_meta, - table_name, - table_types, - tx: &self.tx, - }, - inner, - ranges: VecDeque::from(ranges), - scope_iter: None, - }) - } - - fn add_index_meta( - &mut self, - table_name: &TableName, - index_name: String, - column_ids: Vec, - ty: IndexType, - ) -> Result { - if let Some(mut table) = self.table(table_name.clone()).cloned() { - let index_meta = table.add_index_meta(index_name, column_ids, ty)?; - let (key, value) = TableCodec::encode_index_meta(table_name, index_meta)?; - self.tx.set(key, value); - self.table_cache.remove(table_name); - - Ok(index_meta.id) - } else { - Err(DatabaseError::TableNotFound) - } - } - - fn add_index( - &mut self, - table_name: &str, - index: Index, - tuple_id: &TupleId, - ) -> Result<(), DatabaseError> { - if matches!(index.ty, IndexType::PrimaryKey) { - return Ok(()); - } - let (key, value) = TableCodec::encode_index(table_name, &index, tuple_id)?; - - if matches!(index.ty, IndexType::Unique) { - if let Some(bytes) = self.tx.get(&key)? { - return if bytes != value { - Err(DatabaseError::DuplicateUniqueValue) - } else { - Ok(()) - }; - } - } - - self.tx.set(key, value); - - Ok(()) - } - - fn del_index( - &mut self, - table_name: &str, - index: &Index, - tuple_id: Option<&TupleId>, - ) -> Result<(), DatabaseError> { - if matches!(index.ty, IndexType::PrimaryKey) { - return Ok(()); - } - self.tx - .remove(&TableCodec::encode_index_key(table_name, index, tuple_id)?)?; - - Ok(()) - } - - fn append( - &mut self, - table_name: &str, - tuple: Tuple, - types: &[LogicalType], - is_overwrite: bool, - ) -> Result<(), DatabaseError> { - let (key, value) = TableCodec::encode_tuple(table_name, &tuple, types)?; - - if !is_overwrite && self.tx.get(&key)?.is_some() { - return Err(DatabaseError::DuplicatePrimaryKey); - } - self.tx.set(key, value); - - Ok(()) - } - - fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), DatabaseError> { - let key = TableCodec::encode_tuple_key(table_name, &tuple_id)?; - self.tx.remove(&key)?; - - Ok(()) - } - - fn add_column( - &mut self, - table_name: &TableName, - column: &ColumnCatalog, - if_not_exists: bool, - ) -> Result { - if let Some(mut table) = self.table(table_name.clone()).cloned() { - if !column.nullable && column.default_value()?.is_none() { - return Err(DatabaseError::NeedNullAbleOrDefault); - } - - for col in table.columns() { - if col.name() == column.name() { - return if if_not_exists { - Ok(col.id().unwrap()) - } else { - Err(DatabaseError::DuplicateColumn(column.name().to_string())) - }; - } - } - let col_id = table.add_column(column.clone())?; - - if column.desc.is_unique { - let meta_ref = table.add_index_meta( - format!("uk_{}", column.name()), - vec![col_id], - IndexType::Unique, - )?; - let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?; - self.tx.set(key, value); - } - - let column = table.get_column_by_id(&col_id).unwrap(); - let (key, value) = TableCodec::encode_column(table_name, column)?; - self.tx.set(key, value); - self.table_cache.remove(table_name); - - Ok(col_id) - } else { - Err(DatabaseError::TableNotFound) - } - } - - fn drop_column( - &mut self, - table_name: &TableName, - column_name: &str, - ) -> Result<(), DatabaseError> { - if let Some(table_catalog) = self.table(table_name.clone()).cloned() { - let column = table_catalog.get_column_by_name(column_name).unwrap(); - - let (key, _) = TableCodec::encode_column(table_name, column)?; - self.tx.remove(&key)?; - - for index_meta in table_catalog.indexes.iter() { - if !index_meta.column_ids.contains(&column.id().unwrap()) { - continue; - } - let (index_meta_key, _) = TableCodec::encode_index_meta(table_name, index_meta)?; - self.tx.remove(&index_meta_key)?; - - let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id); - Self::_drop_data(&mut self.tx, &index_min, &index_max)?; - } - self.table_cache.remove(table_name); - - Ok(()) - } else { - Err(DatabaseError::TableNotFound) - } - } - - fn create_table( - &mut self, - table_name: TableName, - columns: Vec, - if_not_exists: bool, - ) -> Result { - let (table_key, value) = - TableCodec::encode_root_table(&TableMeta::empty(table_name.clone()))?; - if self.tx.get(&table_key)?.is_some() { - if if_not_exists { - return Ok(table_name); - } - return Err(DatabaseError::TableExists); - } - self.tx.set(table_key, value); - - let mut table_catalog = TableCatalog::new(table_name.clone(), columns)?; - - Self::create_index_meta_for_table(&mut self.tx, &mut table_catalog)?; - - for column in table_catalog.columns() { - let (key, value) = TableCodec::encode_column(&table_name, column)?; - self.tx.set(key, value); - } - self.table_cache.put(table_name.to_string(), table_catalog); - - Ok(table_name) - } - - fn drop_table(&mut self, table_name: &str, if_exists: bool) -> Result<(), DatabaseError> { - if self.table(Arc::new(table_name.to_string())).is_none() { - if if_exists { - return Ok(()); - } else { - return Err(DatabaseError::TableNotFound); - } - } - self.drop_data(table_name)?; - - let (column_min, column_max) = TableCodec::columns_bound(table_name); - Self::_drop_data(&mut self.tx, &column_min, &column_max)?; - - let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name); - Self::_drop_data(&mut self.tx, &index_meta_min, &index_meta_max)?; - - self.tx - .remove(&TableCodec::encode_root_table_key(table_name))?; - - let _ = self.table_cache.remove(&table_name.to_string()); - - Ok(()) - } - - fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError> { - let (tuple_min, tuple_max) = TableCodec::tuple_bound(table_name); - Self::_drop_data(&mut self.tx, &tuple_min, &tuple_max)?; - - let (index_min, index_max) = TableCodec::all_index_bound(table_name); - Self::_drop_data(&mut self.tx, &index_min, &index_max)?; - - Ok(()) - } - - fn table(&self, table_name: TableName) -> Option<&TableCatalog> { - let mut option = self.table_cache.get(&table_name); - - if option.is_none() { - // TODO: unify the data into a `Meta` prefix and use one iteration to collect all data - let (columns, indexes) = Self::table_collect(table_name.clone(), &self.tx).ok()?; - - if let Ok(catalog) = TableCatalog::reload(table_name.clone(), columns, indexes) { - option = self - .table_cache - .get_or_insert(table_name.to_string(), |_| Ok(catalog)) - .ok(); - } - } - - option - } - - fn table_metas(&self) -> Result, DatabaseError> { - let mut metas = vec![]; - let (min, max) = TableCodec::root_table_bound(); - let mut iter = self.tx.iter(Bound::Included(&min), Bound::Included(&max))?; - - while let Some((_, value_option)) = iter.try_next().ok().flatten() { - if let Some(value) = value_option { - let meta = TableCodec::decode_root_table(&value)?; - - metas.push(meta); - } - } - - Ok(metas) - } - - fn save_table_meta( - &mut self, - table_name: &TableName, - path: String, - statistics_meta: StatisticsMeta, - ) -> Result<(), DatabaseError> { - // TODO: clean old meta file - let index_id = statistics_meta.index_id(); - let _ = self - .meta_cache - .put((table_name.clone(), index_id), statistics_meta); - let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path); - self.tx.set(key, value); - - Ok(()) - } - - fn table_meta_path( - &self, - table_name: &str, - index_id: IndexId, - ) -> Result, DatabaseError> { - let key = TableCodec::encode_statistics_path_key(table_name, index_id); - self.tx - .get(&key)? - .map(|bytes| TableCodec::decode_statistics_path(&bytes)) - .transpose() - } - - fn meta_loader(&self) -> StatisticMetaLoader - where - Self: Sized, - { - StatisticMetaLoader::new(self, &self.meta_cache) - } - - async fn commit(self) -> Result<(), DatabaseError> { - self.tx.commit().await?; - - Ok(()) - } -} - -impl KipTransaction { - fn table_collect( - table_name: TableName, - tx: &mvcc::Transaction, - ) -> Result<(Vec, Vec), DatabaseError> { - let (table_min, table_max) = TableCodec::table_bound(&table_name); - let mut column_iter = tx.iter(Bound::Included(&table_min), Bound::Included(&table_max))?; - - let mut columns = Vec::new(); - let mut index_metas = Vec::new(); - - // Tips: only `Column`, `IndexMeta`, `TableMeta` - while let Some((key, value_option)) = column_iter.try_next().ok().flatten() { - if let Some(value) = value_option { - if key.starts_with(&table_min) { - columns.push(TableCodec::decode_column(&value)?); - } else { - index_metas.push(Arc::new(TableCodec::decode_index_meta(&value)?)); - } - } - } - - Ok((columns, index_metas)) - } - - fn _drop_data(tx: &mut mvcc::Transaction, min: &[u8], max: &[u8]) -> Result<(), DatabaseError> { - let mut iter = tx.iter(Bound::Included(min), Bound::Included(max))?; - let mut data_keys = vec![]; - - while let Some((key, value_option)) = iter.try_next()? { - if value_option.is_some() { - data_keys.push(key); - } - } - drop(iter); - - for key in data_keys { - tx.remove(&key)? - } - - Ok(()) - } - - fn create_index_meta_for_table( - tx: &mut mvcc::Transaction, - table: &mut TableCatalog, - ) -> Result<(), DatabaseError> { - let table_name = table.name.clone(); - let index_column = table - .columns() - .filter(|column| column.desc.is_primary || column.desc.is_unique) - .map(|column| (column.id().unwrap(), column.clone())) - .collect_vec(); - - for (col_id, col) in index_column { - let is_primary = col.desc.is_primary; - let index_ty = if is_primary { - IndexType::PrimaryKey - } else if col.desc.is_unique { - IndexType::Unique - } else { - continue; - }; - // FIXME: composite indexes may exist on future - let prefix = if is_primary { "pk" } else { "uk" }; - - let meta_ref = table.add_index_meta( - format!("{}_{}", prefix, col.name()), - vec![col_id], - index_ty, - )?; - let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?; - tx.set(key, value); - } - Ok(()) - } -} - -pub struct KipIter<'a> { - offset: usize, - limit: Option, - table_types: Vec, - tuple_columns: Arc>, - projections: Vec, - iter: TransactionIter<'a>, -} - -impl Iter for KipIter<'_> { - fn next_tuple(&mut self) -> Result, DatabaseError> { - while self.offset > 0 { - let _ = self.iter.try_next()?; - self.offset -= 1; - } - - if let Some(num) = self.limit { - if num == 0 { - return Ok(None); - } - } - - while let Some(item) = self.iter.try_next()? { - if let (_, Some(value)) = item { - let tuple = TableCodec::decode_tuple( - &self.table_types, - &self.projections, - &self.tuple_columns, - &value, - ); - - if let Some(num) = self.limit.as_mut() { - num.sub_assign(1); - } - - return Ok(Some(tuple)); - } - } - - Ok(None) - } -} - -#[cfg(test)] -mod test { - use crate::catalog::{ColumnCatalog, ColumnDesc}; - use crate::db::DataBaseBuilder; - use crate::errors::DatabaseError; - use crate::expression::range_detacher::Range; - use crate::storage::kip::KipStorage; - use crate::storage::{ - IndexImplEnum, IndexImplParams, IndexIter, Iter, PrimaryKeyIndexImpl, Storage, Transaction, - }; - use crate::types::index::{IndexMeta, IndexType}; - use crate::types::tuple::Tuple; - use crate::types::value::DataValue; - use crate::types::LogicalType; - use itertools::Itertools; - use std::collections::{Bound, VecDeque}; - use std::sync::Arc; - use tempfile::TempDir; - - #[tokio::test] - async fn test_in_kipdb_storage_works_with_data() -> Result<(), DatabaseError> { - let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let storage = KipStorage::new(temp_dir.path()).await?; - let mut transaction = storage.transaction().await?; - let columns = Arc::new(vec![ - Arc::new(ColumnCatalog::new( - "c1".to_string(), - false, - ColumnDesc::new(LogicalType::Integer, true, false, None), - )), - Arc::new(ColumnCatalog::new( - "c2".to_string(), - false, - ColumnDesc::new(LogicalType::Boolean, false, false, None), - )), - ]); - - let source_columns = columns - .iter() - .map(|col_ref| ColumnCatalog::clone(&col_ref)) - .collect_vec(); - let _ = transaction.create_table(Arc::new("test".to_string()), source_columns, false)?; - - let table_catalog = transaction.table(Arc::new("test".to_string())); - assert!(table_catalog.is_some()); - assert!(table_catalog - .unwrap() - .get_column_id_by_name(&"c1".to_string()) - .is_some()); - - transaction.append( - &"test".to_string(), - Tuple { - id: Some(Arc::new(DataValue::Int32(Some(1)))), - values: vec![ - Arc::new(DataValue::Int32(Some(1))), - Arc::new(DataValue::Boolean(Some(true))), - ], - }, - &[LogicalType::Integer, LogicalType::Boolean], - false, - )?; - transaction.append( - &"test".to_string(), - Tuple { - id: Some(Arc::new(DataValue::Int32(Some(2)))), - values: vec![ - Arc::new(DataValue::Int32(Some(2))), - Arc::new(DataValue::Boolean(Some(false))), - ], - }, - &[LogicalType::Integer, LogicalType::Boolean], - false, - )?; - - let mut iter = transaction.read( - Arc::new("test".to_string()), - (Some(1), Some(1)), - vec![(0, columns[0].clone())], - )?; - - let option_1 = iter.next_tuple()?; - assert_eq!( - option_1.unwrap().id, - Some(Arc::new(DataValue::Int32(Some(2)))) - ); - - let option_2 = iter.next_tuple()?; - assert_eq!(option_2, None); - - Ok(()) - } - - #[tokio::test] - async fn test_index_iter_pk() -> Result<(), DatabaseError> { - let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; - - let _ = fnck_sql.run("create table t1 (a int primary key)").await?; - let _ = fnck_sql - .run("insert into t1 (a) values (0), (1), (2), (3), (4)") - .await?; - let transaction = fnck_sql.storage.transaction().await?; - - let table_name = Arc::new("t1".to_string()); - let table = transaction.table(table_name.clone()).unwrap().clone(); - let tuple_ids = vec![ - Arc::new(DataValue::Int32(Some(0))), - Arc::new(DataValue::Int32(Some(2))), - Arc::new(DataValue::Int32(Some(3))), - Arc::new(DataValue::Int32(Some(4))), - ]; - let mut iter = IndexIter { - offset: 0, - limit: None, - params: IndexImplParams { - tuple_schema_ref: table.schema_ref().clone(), - projections: vec![0], - index_meta: Arc::new(IndexMeta { - id: 0, - column_ids: vec![0], - table_name, - pk_ty: LogicalType::Integer, - name: "pk_a".to_string(), - ty: IndexType::PrimaryKey, - }), - table_name: &table.name, - table_types: table.types(), - tx: &transaction.tx, - }, - ranges: VecDeque::from(vec![ - Range::Eq(Arc::new(DataValue::Int32(Some(0)))), - Range::Scope { - min: Bound::Included(Arc::new(DataValue::Int32(Some(2)))), - max: Bound::Included(Arc::new(DataValue::Int32(Some(4)))), - }, - ]), - scope_iter: None, - inner: IndexImplEnum::PrimaryKey(PrimaryKeyIndexImpl), - }; - let mut result = Vec::new(); - - while let Some(tuple) = iter.next_tuple()? { - result.push(tuple.id.unwrap()); - } - - assert_eq!(result, tuple_ids); - - Ok(()) - } - - #[tokio::test] - async fn test_read_by_index() -> Result<(), DatabaseError> { - let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; - let _ = fnck_sql - .run("create table t1 (a int primary key, b int unique)") - .await?; - let _ = fnck_sql - .run("insert into t1 (a, b) values (0, 0), (1, 1), (2, 2)") - .await?; - let transaction = fnck_sql.storage.transaction().await.unwrap(); - - let table = transaction - .table(Arc::new("t1".to_string())) - .unwrap() - .clone(); - let columns = table.columns().cloned().enumerate().collect_vec(); - let mut iter = transaction - .read_by_index( - Arc::new("t1".to_string()), - (Some(0), Some(1)), - columns, - table.indexes[0].clone(), - vec![Range::Scope { - min: Bound::Excluded(Arc::new(DataValue::Int32(Some(0)))), - max: Bound::Unbounded, - }], - ) - .unwrap(); - - while let Some(tuple) = iter.next_tuple()? { - assert_eq!(tuple.id, Some(Arc::new(DataValue::Int32(Some(1))))); - assert_eq!( - tuple.values, - vec![ - Arc::new(DataValue::Int32(Some(1))), - Arc::new(DataValue::Int32(Some(1))) - ] - ) - } - - Ok(()) - } -} diff --git a/src/storage/kipdb.rs b/src/storage/kipdb.rs new file mode 100644 index 00000000..40c37f96 --- /dev/null +++ b/src/storage/kipdb.rs @@ -0,0 +1,311 @@ +use crate::catalog::TableCatalog; +use crate::errors::DatabaseError; +use crate::storage::{InnerIter, StatisticsMetaCache, Storage, Transaction}; +use bytes::Bytes; +use kip_db::kernel::lsm::iterator::Iter as KipDBIter; +use kip_db::kernel::lsm::mvcc::{CheckType, TransactionIter}; +use kip_db::kernel::lsm::storage::Config; +use kip_db::kernel::lsm::{mvcc, storage}; +use kip_db::kernel::utils::lru_cache::ShardingLruCache; +use std::collections::hash_map::RandomState; +use std::collections::Bound; +use std::path::PathBuf; +use std::sync::Arc; + +#[derive(Clone)] +pub struct KipStorage { + pub inner: Arc, + pub(crate) meta_cache: Arc, + pub(crate) table_cache: Arc>, +} + +impl KipStorage { + pub async fn new(path: impl Into + Send) -> Result { + let storage = + storage::KipStorage::open_with_config(Config::new(path).enable_level_0_memorization()) + .await?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); + + Ok(KipStorage { + inner: Arc::new(storage), + meta_cache, + table_cache, + }) + } +} + +impl Storage for KipStorage { + type TransactionType = KipTransaction; + + async fn transaction(&self) -> Result { + let tx = self.inner.new_transaction(CheckType::Optimistic).await; + + Ok(KipTransaction { + tx, + table_cache: Arc::clone(&self.table_cache), + meta_cache: self.meta_cache.clone(), + }) + } +} + +pub struct KipTransaction { + tx: mvcc::Transaction, + table_cache: Arc>, + meta_cache: Arc, +} + +impl Transaction for KipTransaction { + type IterType<'a> = KipIter<'a>; + + fn get(&self, key: &[u8]) -> Result, DatabaseError> { + Ok(self.tx.get(key)?) + } + + fn range<'a>( + &'a self, + min: Bound<&[u8]>, + max: Bound<&[u8]>, + ) -> Result, DatabaseError> { + Ok(KipIter { + iter: self.tx.iter(min, max)?, + }) + } + + fn set(&mut self, key: Bytes, value: Bytes) -> Result<(), DatabaseError> { + self.tx.set(key, value); + + Ok(()) + } + + fn remove(&mut self, key: &[u8]) -> Result<(), DatabaseError> { + self.tx.remove(key)?; + + Ok(()) + } + + fn table_cache(&self) -> &ShardingLruCache { + self.table_cache.as_ref() + } + + fn meta_cache(&self) -> &StatisticsMetaCache { + self.meta_cache.as_ref() + } + + async fn commit(self) -> Result<(), DatabaseError> { + self.tx.commit().await?; + + Ok(()) + } +} + +pub struct KipIter<'a> { + iter: TransactionIter<'a>, +} + +impl InnerIter for KipIter<'_> { + fn try_next(&mut self) -> Result, DatabaseError> { + while let Some((key, value_option)) = self.iter.try_next()? { + if let Some(value) = value_option { + return Ok(Some((key, value))); + } + } + Ok(None) + } +} + +#[cfg(test)] +mod test { + use crate::catalog::{ColumnCatalog, ColumnDesc}; + use crate::db::DataBaseBuilder; + use crate::errors::DatabaseError; + use crate::expression::range_detacher::Range; + use crate::storage::kipdb::KipStorage; + use crate::storage::{ + IndexImplEnum, IndexImplParams, IndexIter, Iter, PrimaryKeyIndexImpl, Storage, Transaction, + }; + use crate::types::index::{IndexMeta, IndexType}; + use crate::types::tuple::Tuple; + use crate::types::value::DataValue; + use crate::types::LogicalType; + use itertools::Itertools; + use std::collections::{Bound, VecDeque}; + use std::sync::Arc; + use tempfile::TempDir; + + #[tokio::test] + async fn test_in_kipdb_storage_works_with_data() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let mut transaction = storage.transaction().await?; + let columns = Arc::new(vec![ + Arc::new(ColumnCatalog::new( + "c1".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, true, false, None), + )), + Arc::new(ColumnCatalog::new( + "c2".to_string(), + false, + ColumnDesc::new(LogicalType::Boolean, false, false, None), + )), + ]); + + let source_columns = columns + .iter() + .map(|col_ref| ColumnCatalog::clone(&col_ref)) + .collect_vec(); + let _ = transaction.create_table(Arc::new("test".to_string()), source_columns, false)?; + + let table_catalog = transaction.table(Arc::new("test".to_string())); + assert!(table_catalog.is_some()); + assert!(table_catalog + .unwrap() + .get_column_id_by_name(&"c1".to_string()) + .is_some()); + + transaction.append( + &"test".to_string(), + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(1)))), + values: vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Boolean(Some(true))), + ], + }, + &[LogicalType::Integer, LogicalType::Boolean], + false, + )?; + transaction.append( + &"test".to_string(), + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(2)))), + values: vec![ + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Boolean(Some(false))), + ], + }, + &[LogicalType::Integer, LogicalType::Boolean], + false, + )?; + + let mut iter = transaction.read( + Arc::new("test".to_string()), + (Some(1), Some(1)), + vec![(0, columns[0].clone())], + )?; + + let option_1 = iter.next_tuple()?; + assert_eq!( + option_1.unwrap().id, + Some(Arc::new(DataValue::Int32(Some(2)))) + ); + + let option_2 = iter.next_tuple()?; + assert_eq!(option_2, None); + + Ok(()) + } + + #[tokio::test] + async fn test_index_iter_pk() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; + + let _ = fnck_sql.run("create table t1 (a int primary key)").await?; + let _ = fnck_sql + .run("insert into t1 (a) values (0), (1), (2), (3), (4)") + .await?; + let transaction = fnck_sql.storage.transaction().await?; + + let table_name = Arc::new("t1".to_string()); + let table = transaction.table(table_name.clone()).unwrap().clone(); + let tuple_ids = vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Int32(Some(3))), + Arc::new(DataValue::Int32(Some(4))), + ]; + let mut iter = IndexIter { + offset: 0, + limit: None, + params: IndexImplParams { + tuple_schema_ref: table.schema_ref().clone(), + projections: vec![0], + index_meta: Arc::new(IndexMeta { + id: 0, + column_ids: vec![0], + table_name, + pk_ty: LogicalType::Integer, + name: "pk_a".to_string(), + ty: IndexType::PrimaryKey, + }), + table_name: &table.name, + table_types: table.types(), + tx: &transaction, + }, + ranges: VecDeque::from(vec![ + Range::Eq(Arc::new(DataValue::Int32(Some(0)))), + Range::Scope { + min: Bound::Included(Arc::new(DataValue::Int32(Some(2)))), + max: Bound::Included(Arc::new(DataValue::Int32(Some(4)))), + }, + ]), + scope_iter: None, + inner: IndexImplEnum::PrimaryKey(PrimaryKeyIndexImpl), + }; + let mut result = Vec::new(); + + while let Some(tuple) = iter.next_tuple()? { + result.push(tuple.id.unwrap()); + } + + assert_eq!(result, tuple_ids); + + Ok(()) + } + + #[tokio::test] + async fn test_read_by_index() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; + let _ = fnck_sql + .run("create table t1 (a int primary key, b int unique)") + .await?; + let _ = fnck_sql + .run("insert into t1 (a, b) values (0, 0), (1, 1), (2, 2)") + .await?; + let transaction = fnck_sql.storage.transaction().await.unwrap(); + + let table = transaction + .table(Arc::new("t1".to_string())) + .unwrap() + .clone(); + let columns = table.columns().cloned().enumerate().collect_vec(); + let mut iter = transaction + .read_by_index( + Arc::new("t1".to_string()), + (Some(0), Some(1)), + columns, + table.indexes[0].clone(), + vec![Range::Scope { + min: Bound::Excluded(Arc::new(DataValue::Int32(Some(0)))), + max: Bound::Unbounded, + }], + ) + .unwrap(); + + while let Some(tuple) = iter.next_tuple()? { + assert_eq!(tuple.id, Some(Arc::new(DataValue::Int32(Some(1))))); + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(1))) + ] + ) + } + + Ok(()) + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d434fc21..4326c33a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,4 +1,4 @@ -pub mod kip; +pub mod kipdb; mod table_codec; use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; @@ -11,13 +11,15 @@ use crate::types::tuple::{Tuple, TupleId}; use crate::types::value::{DataValue, ValueRef}; use crate::types::{ColumnId, LogicalType}; use bytes::Bytes; -use kip_db::kernel::lsm::iterator::Iter as DBIter; -use kip_db::kernel::lsm::mvcc; +use itertools::Itertools; +use kip_db::kernel::utils::lru_cache::ShardingLruCache; use std::collections::{Bound, VecDeque}; use std::ops::SubAssign; use std::sync::Arc; use std::{mem, slice}; +pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), StatisticsMeta>; + pub trait Storage: Sync + Send + Clone + 'static { type TransactionType: Transaction; @@ -28,8 +30,8 @@ pub trait Storage: Sync + Send + Clone + 'static { /// Optional bounds of the reader, of the form (offset, limit). pub(crate) type Bounds = (Option, Option); -pub trait Transaction: Sync + Send + 'static { - type IterType<'a>: Iter; +pub trait Transaction: Sync + Send + 'static + Sized { + type IterType<'a>: InnerIter; /// The bounds is applied to the whole data batches, not per batch. /// @@ -38,17 +40,81 @@ pub trait Transaction: Sync + Send + 'static { &self, table_name: TableName, bounds: Bounds, - columns: Vec<(usize, ColumnRef)>, - ) -> Result, DatabaseError>; + mut columns: Vec<(usize, ColumnRef)>, + ) -> Result, DatabaseError> { + assert!(columns.is_sorted_by_key(|(i, _)| i)); + assert!(columns.iter().map(|(i, _)| i).all_unique()); + + let table = self + .table(table_name.clone()) + .ok_or(DatabaseError::TableNotFound)?; + let table_types = table.types(); + if columns.is_empty() { + let (i, column) = table.primary_key()?; + columns.push((i, column.clone())); + } + let mut tuple_columns = Vec::with_capacity(columns.len()); + let mut projections = Vec::with_capacity(columns.len()); + for (projection, column) in columns { + tuple_columns.push(column); + projections.push(projection); + } + + let (min, max) = TableCodec::tuple_bound(&table_name); + let iter = self.range(Bound::Included(&min), Bound::Included(&max))?; + + Ok(TupleIter { + offset: bounds.0.unwrap_or(0), + limit: bounds.1, + table_types, + tuple_columns: Arc::new(tuple_columns), + projections, + iter, + }) + } fn read_by_index( &self, table_name: TableName, - bounds: Bounds, + (offset_option, limit_option): Bounds, columns: Vec<(usize, ColumnRef)>, index_meta: IndexMetaRef, ranges: Vec, - ) -> Result, DatabaseError>; + ) -> Result, DatabaseError> { + assert!(columns.is_sorted_by_key(|(i, _)| i)); + assert!(columns.iter().map(|(i, _)| i).all_unique()); + + let table = self + .table(table_name.clone()) + .ok_or(DatabaseError::TableNotFound)?; + let table_types = table.types(); + let table_name = table.name.as_str(); + let offset = offset_option.unwrap_or(0); + + let mut tuple_columns = Vec::with_capacity(columns.len()); + let mut projections = Vec::with_capacity(columns.len()); + for (projection, column) in columns { + tuple_columns.push(column); + projections.push(projection); + } + let inner = IndexImplEnum::instance(index_meta.ty); + + Ok(IndexIter { + offset, + limit: limit_option, + params: IndexImplParams { + tuple_schema_ref: Arc::new(tuple_columns), + projections, + index_meta, + table_name, + table_types, + tx: self, + }, + inner, + ranges: VecDeque::from(ranges), + scope_iter: None, + }) + } fn add_index_meta( &mut self, @@ -56,21 +122,57 @@ pub trait Transaction: Sync + Send + 'static { index_name: String, column_ids: Vec, ty: IndexType, - ) -> Result; + ) -> Result { + if let Some(mut table) = self.table(table_name.clone()).cloned() { + let index_meta = table.add_index_meta(index_name, column_ids, ty)?; + let (key, value) = TableCodec::encode_index_meta(table_name, index_meta)?; + self.set(key, value)?; + self.table_cache().remove(table_name); + + Ok(index_meta.id) + } else { + Err(DatabaseError::TableNotFound) + } + } fn add_index( &mut self, table_name: &str, index: Index, tuple_id: &TupleId, - ) -> Result<(), DatabaseError>; + ) -> Result<(), DatabaseError> { + if matches!(index.ty, IndexType::PrimaryKey) { + return Ok(()); + } + let (key, value) = TableCodec::encode_index(table_name, &index, tuple_id)?; + + if matches!(index.ty, IndexType::Unique) { + if let Some(bytes) = self.get(&key)? { + return if bytes != value { + Err(DatabaseError::DuplicateUniqueValue) + } else { + Ok(()) + }; + } + } + self.set(key, value)?; + + Ok(()) + } fn del_index( &mut self, table_name: &str, index: &Index, tuple_id: Option<&TupleId>, - ) -> Result<(), DatabaseError>; + ) -> Result<(), DatabaseError> { + if matches!(index.ty, IndexType::PrimaryKey) { + return Ok(()); + } + self.remove(&TableCodec::encode_index_key(table_name, index, tuple_id)?)?; + + Ok(()) + } fn append( &mut self, @@ -78,62 +180,334 @@ pub trait Transaction: Sync + Send + 'static { tuple: Tuple, types: &[LogicalType], is_overwrite: bool, - ) -> Result<(), DatabaseError>; + ) -> Result<(), DatabaseError> { + let (key, value) = TableCodec::encode_tuple(table_name, &tuple, types)?; - fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), DatabaseError>; + if !is_overwrite && self.get(&key)?.is_some() { + return Err(DatabaseError::DuplicatePrimaryKey); + } + self.set(key, value)?; + + Ok(()) + } + + fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), DatabaseError> { + let key = TableCodec::encode_tuple_key(table_name, &tuple_id)?; + self.remove(&key)?; + + Ok(()) + } fn add_column( &mut self, table_name: &TableName, column: &ColumnCatalog, if_not_exists: bool, - ) -> Result; + ) -> Result { + if let Some(mut table) = self.table(table_name.clone()).cloned() { + if !column.nullable && column.default_value()?.is_none() { + return Err(DatabaseError::NeedNullAbleOrDefault); + } - fn drop_column(&mut self, table_name: &TableName, column: &str) -> Result<(), DatabaseError>; + for col in table.columns() { + if col.name() == column.name() { + return if if_not_exists { + Ok(col.id().unwrap()) + } else { + Err(DatabaseError::DuplicateColumn(column.name().to_string())) + }; + } + } + let col_id = table.add_column(column.clone())?; + + if column.desc.is_unique { + let meta_ref = table.add_index_meta( + format!("uk_{}", column.name()), + vec![col_id], + IndexType::Unique, + )?; + let (key, value) = TableCodec::encode_index_meta(table_name, meta_ref)?; + self.set(key, value)?; + } + + let column = table.get_column_by_id(&col_id).unwrap(); + let (key, value) = TableCodec::encode_column(table_name, column)?; + self.set(key, value)?; + self.table_cache().remove(table_name); + + Ok(col_id) + } else { + Err(DatabaseError::TableNotFound) + } + } + + fn drop_column( + &mut self, + table_name: &TableName, + column_name: &str, + ) -> Result<(), DatabaseError> { + if let Some(table_catalog) = self.table(table_name.clone()).cloned() { + let column = table_catalog.get_column_by_name(column_name).unwrap(); + + let (key, _) = TableCodec::encode_column(table_name, column)?; + self.remove(&key)?; + + for index_meta in table_catalog.indexes.iter() { + if !index_meta.column_ids.contains(&column.id().unwrap()) { + continue; + } + let (index_meta_key, _) = TableCodec::encode_index_meta(table_name, index_meta)?; + self.remove(&index_meta_key)?; + + let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id); + self._drop_data(&index_min, &index_max)?; + } + self.table_cache().remove(table_name); + + Ok(()) + } else { + Err(DatabaseError::TableNotFound) + } + } fn create_table( &mut self, table_name: TableName, columns: Vec, if_not_exists: bool, - ) -> Result; + ) -> Result { + let (table_key, value) = + TableCodec::encode_root_table(&TableMeta::empty(table_name.clone()))?; + if self.get(&table_key)?.is_some() { + if if_not_exists { + return Ok(table_name); + } + return Err(DatabaseError::TableExists); + } + self.set(table_key, value)?; + + let mut table_catalog = TableCatalog::new(table_name.clone(), columns)?; + + self.create_index_meta_for_table(&mut table_catalog)?; + + for column in table_catalog.columns() { + let (key, value) = TableCodec::encode_column(&table_name, column)?; + self.set(key, value)?; + } + self.table_cache() + .put(table_name.to_string(), table_catalog); + + Ok(table_name) + } + + fn drop_table(&mut self, table_name: &str, if_exists: bool) -> Result<(), DatabaseError> { + if self.table(Arc::new(table_name.to_string())).is_none() { + if if_exists { + return Ok(()); + } else { + return Err(DatabaseError::TableNotFound); + } + } + self.drop_data(table_name)?; + + let (column_min, column_max) = TableCodec::columns_bound(table_name); + self._drop_data(&column_min, &column_max)?; + + let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name); + self._drop_data(&index_meta_min, &index_meta_max)?; + + self.remove(&TableCodec::encode_root_table_key(table_name))?; + self.table_cache().remove(&table_name.to_string()); + + Ok(()) + } + + fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError> { + let (tuple_min, tuple_max) = TableCodec::tuple_bound(table_name); + self._drop_data(&tuple_min, &tuple_max)?; + + let (index_min, index_max) = TableCodec::all_index_bound(table_name); + self._drop_data(&index_min, &index_max)?; + + Ok(()) + } + + fn table(&self, table_name: TableName) -> Option<&TableCatalog> { + let mut option = self.table_cache().get(&table_name); + + if option.is_none() { + // TODO: unify the data into a `Meta` prefix and use one iteration to collect all data + let (columns, indexes) = self.table_collect(table_name.clone()).ok()?; + + if let Ok(catalog) = TableCatalog::reload(table_name.clone(), columns, indexes) { + option = self + .table_cache() + .get_or_insert(table_name.to_string(), |_| Ok(catalog)) + .ok(); + } + } + + option + } + + fn table_metas(&self) -> Result, DatabaseError> { + let mut metas = vec![]; + let (min, max) = TableCodec::root_table_bound(); + let mut iter = self.range(Bound::Included(&min), Bound::Included(&max))?; + + while let Some((_, value)) = iter.try_next().ok().flatten() { + let meta = TableCodec::decode_root_table(&value)?; + + metas.push(meta); + } + + Ok(metas) + } - fn drop_table(&mut self, table_name: &str, if_exists: bool) -> Result<(), DatabaseError>; - fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError>; - fn table(&self, table_name: TableName) -> Option<&TableCatalog>; - fn table_metas(&self) -> Result, DatabaseError>; fn save_table_meta( &mut self, table_name: &TableName, path: String, statistics_meta: StatisticsMeta, - ) -> Result<(), DatabaseError>; + ) -> Result<(), DatabaseError> { + // TODO: clean old meta file + let index_id = statistics_meta.index_id(); + let _ = self + .meta_cache() + .put((table_name.clone(), index_id), statistics_meta); + let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path); + + self.set(key, value)?; + + Ok(()) + } + fn table_meta_path( &self, table_name: &str, index_id: IndexId, - ) -> Result, DatabaseError>; + ) -> Result, DatabaseError> { + let key = TableCodec::encode_statistics_path_key(table_name, index_id); + self.get(&key)? + .map(|bytes| TableCodec::decode_statistics_path(&bytes)) + .transpose() + } + fn meta_loader(&self) -> StatisticMetaLoader where - Self: Sized; + Self: Sized, + { + StatisticMetaLoader::new(self, self.meta_cache()) + } + + fn table_collect( + &self, + table_name: TableName, + ) -> Result<(Vec, Vec), DatabaseError> { + let (table_min, table_max) = TableCodec::table_bound(&table_name); + let mut column_iter = + self.range(Bound::Included(&table_min), Bound::Included(&table_max))?; + + let mut columns = Vec::new(); + let mut index_metas = Vec::new(); + + // Tips: only `Column`, `IndexMeta`, `TableMeta` + while let Some((key, value)) = column_iter.try_next().ok().flatten() { + if key.starts_with(&table_min) { + columns.push(TableCodec::decode_column(&value)?); + } else { + index_metas.push(Arc::new(TableCodec::decode_index_meta(&value)?)); + } + } + + Ok((columns, index_metas)) + } + + fn _drop_data(&mut self, min: &[u8], max: &[u8]) -> Result<(), DatabaseError> { + let mut iter = self.range(Bound::Included(min), Bound::Included(max))?; + let mut data_keys = vec![]; + + while let Some((key, _)) = iter.try_next()? { + data_keys.push(key); + } + drop(iter); + + for key in data_keys { + self.remove(&key)? + } + + Ok(()) + } + + fn create_index_meta_for_table( + &mut self, + table: &mut TableCatalog, + ) -> Result<(), DatabaseError> { + let table_name = table.name.clone(); + let index_column = table + .columns() + .filter(|column| column.desc.is_primary || column.desc.is_unique) + .map(|column| (column.id().unwrap(), column.clone())) + .collect_vec(); + + for (col_id, col) in index_column { + let is_primary = col.desc.is_primary; + let index_ty = if is_primary { + IndexType::PrimaryKey + } else if col.desc.is_unique { + IndexType::Unique + } else { + continue; + }; + // FIXME: composite indexes may exist on future + let prefix = if is_primary { "pk" } else { "uk" }; + + let meta_ref = table.add_index_meta( + format!("{}_{}", prefix, col.name()), + vec![col_id], + index_ty, + )?; + let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?; + self.set(key, value)?; + } + Ok(()) + } + + fn get(&self, key: &[u8]) -> Result, DatabaseError>; + + fn set(&mut self, key: Bytes, value: Bytes) -> Result<(), DatabaseError>; + + fn remove(&mut self, key: &[u8]) -> Result<(), DatabaseError>; + + fn range<'a>( + &'a self, + min: Bound<&[u8]>, + max: Bound<&[u8]>, + ) -> Result, DatabaseError>; + + fn table_cache(&self) -> &ShardingLruCache; + fn meta_cache(&self) -> &StatisticsMetaCache; #[allow(async_fn_in_trait)] async fn commit(self) -> Result<(), DatabaseError>; } -trait IndexImpl { - fn index_lookup(&self, bytes: &Bytes, params: &IndexImplParams) - -> Result; +trait IndexImpl { + fn index_lookup( + &self, + bytes: &Bytes, + params: &IndexImplParams, + ) -> Result; fn eq_to_res<'a>( &self, value: &ValueRef, - params: &IndexImplParams<'a>, - ) -> Result, DatabaseError>; + params: &IndexImplParams<'a, T>, + ) -> Result, DatabaseError>; fn bound_key( &self, - params: &IndexImplParams, + params: &IndexImplParams, value: &ValueRef, is_upper: bool, ) -> Result, DatabaseError>; @@ -162,17 +536,17 @@ struct UniqueIndexImpl; struct NormalIndexImpl; struct CompositeIndexImpl; -struct IndexImplParams<'a> { +struct IndexImplParams<'a, T: Transaction> { tuple_schema_ref: Arc>, projections: Vec, index_meta: IndexMetaRef, table_name: &'a str, table_types: Vec, - tx: &'a mvcc::Transaction, + tx: &'a T, } -impl IndexImplParams<'_> { +impl IndexImplParams<'_, T> { fn get_tuple_by_id(&self, tuple_id: &TupleId) -> Result, DatabaseError> { let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?; @@ -187,16 +561,16 @@ impl IndexImplParams<'_> { } } -enum IndexResult<'a> { +enum IndexResult<'a, T: Transaction> { Tuple(Tuple), - Scope(mvcc::TransactionIter<'a>), + Scope(T::IterType<'a>), } -impl IndexImpl for IndexImplEnum { +impl IndexImpl for IndexImplEnum { fn index_lookup( &self, bytes: &Bytes, - params: &IndexImplParams, + params: &IndexImplParams, ) -> Result { match self { IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, params), @@ -209,8 +583,8 @@ impl IndexImpl for IndexImplEnum { fn eq_to_res<'a>( &self, value: &ValueRef, - params: &IndexImplParams<'a>, - ) -> Result, DatabaseError> { + params: &IndexImplParams<'a, T>, + ) -> Result, DatabaseError> { match self { IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, params), IndexImplEnum::Unique(inner) => inner.eq_to_res(value, params), @@ -221,7 +595,7 @@ impl IndexImpl for IndexImplEnum { fn bound_key( &self, - params: &IndexImplParams, + params: &IndexImplParams, value: &ValueRef, is_upper: bool, ) -> Result, DatabaseError> { @@ -234,11 +608,11 @@ impl IndexImpl for IndexImplEnum { } } -impl IndexImpl for PrimaryKeyIndexImpl { +impl IndexImpl for PrimaryKeyIndexImpl { fn index_lookup( &self, bytes: &Bytes, - params: &IndexImplParams, + params: &IndexImplParams, ) -> Result { Ok(TableCodec::decode_tuple( ¶ms.table_types, @@ -251,8 +625,8 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn eq_to_res<'a>( &self, value: &ValueRef, - params: &IndexImplParams<'a>, - ) -> Result, DatabaseError> { + params: &IndexImplParams<'a, T>, + ) -> Result, DatabaseError> { let bytes = params .tx .get(&TableCodec::encode_tuple_key(params.table_name, value)?)? @@ -270,7 +644,7 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn bound_key( &self, - params: &IndexImplParams, + params: &IndexImplParams, val: &ValueRef, _: bool, ) -> Result, DatabaseError> { @@ -278,18 +652,21 @@ impl IndexImpl for PrimaryKeyIndexImpl { } } -fn secondary_index_lookup(bytes: &Bytes, params: &IndexImplParams) -> Result { +fn secondary_index_lookup( + bytes: &Bytes, + params: &IndexImplParams, +) -> Result { let tuple_id = TableCodec::decode_index(bytes, ¶ms.index_meta.pk_ty); params .get_tuple_by_id(&tuple_id)? .ok_or_else(|| DatabaseError::NotFound("index's tuple_id", tuple_id.to_string())) } -impl IndexImpl for UniqueIndexImpl { +impl IndexImpl for UniqueIndexImpl { fn index_lookup( &self, bytes: &Bytes, - params: &IndexImplParams, + params: &IndexImplParams, ) -> Result { secondary_index_lookup(bytes, params) } @@ -297,8 +674,8 @@ impl IndexImpl for UniqueIndexImpl { fn eq_to_res<'a>( &self, value: &ValueRef, - params: &IndexImplParams<'a>, - ) -> Result, DatabaseError> { + params: &IndexImplParams<'a, T>, + ) -> Result, DatabaseError> { let bytes = params .tx .get(&self.bound_key(params, value, false)?)? @@ -314,7 +691,7 @@ impl IndexImpl for UniqueIndexImpl { fn bound_key( &self, - params: &IndexImplParams, + params: &IndexImplParams, value: &ValueRef, _: bool, ) -> Result, DatabaseError> { @@ -328,11 +705,11 @@ impl IndexImpl for UniqueIndexImpl { } } -impl IndexImpl for NormalIndexImpl { +impl IndexImpl for NormalIndexImpl { fn index_lookup( &self, bytes: &Bytes, - params: &IndexImplParams, + params: &IndexImplParams, ) -> Result { secondary_index_lookup(bytes, params) } @@ -340,12 +717,12 @@ impl IndexImpl for NormalIndexImpl { fn eq_to_res<'a>( &self, value: &ValueRef, - params: &IndexImplParams<'a>, - ) -> Result, DatabaseError> { + params: &IndexImplParams<'a, T>, + ) -> Result, DatabaseError> { let min = self.bound_key(params, value, false)?; let max = self.bound_key(params, value, true)?; - let iter = params.tx.iter( + let iter = params.tx.range( Bound::Included(min.as_slice()), Bound::Included(max.as_slice()), )?; @@ -354,7 +731,7 @@ impl IndexImpl for NormalIndexImpl { fn bound_key( &self, - params: &IndexImplParams, + params: &IndexImplParams, value: &ValueRef, is_upper: bool, ) -> Result, DatabaseError> { @@ -368,11 +745,11 @@ impl IndexImpl for NormalIndexImpl { } } -impl IndexImpl for CompositeIndexImpl { +impl IndexImpl for CompositeIndexImpl { fn index_lookup( &self, bytes: &Bytes, - params: &IndexImplParams, + params: &IndexImplParams, ) -> Result { secondary_index_lookup(bytes, params) } @@ -380,12 +757,12 @@ impl IndexImpl for CompositeIndexImpl { fn eq_to_res<'a>( &self, value: &ValueRef, - params: &IndexImplParams<'a>, - ) -> Result, DatabaseError> { + params: &IndexImplParams<'a, T>, + ) -> Result, DatabaseError> { let min = self.bound_key(params, value, false)?; let max = self.bound_key(params, value, true)?; - let iter = params.tx.iter( + let iter = params.tx.range( Bound::Included(min.as_slice()), Bound::Included(max.as_slice()), )?; @@ -394,7 +771,7 @@ impl IndexImpl for CompositeIndexImpl { fn bound_key( &self, - params: &IndexImplParams, + params: &IndexImplParams, value: &ValueRef, is_upper: bool, ) -> Result, DatabaseError> { @@ -409,19 +786,60 @@ impl IndexImpl for CompositeIndexImpl { } } -// TODO: Table return optimization -pub struct IndexIter<'a> { +pub struct TupleIter<'a, T: Transaction> { offset: usize, limit: Option, + table_types: Vec, + tuple_columns: Arc>, + projections: Vec, + iter: T::IterType<'a>, +} + +impl Iter for TupleIter<'_, T> { + fn next_tuple(&mut self) -> Result, DatabaseError> { + while self.offset > 0 { + let _ = self.iter.try_next()?; + self.offset -= 1; + } - params: IndexImplParams<'a>, + if let Some(num) = self.limit { + if num == 0 { + return Ok(None); + } + } + + #[allow(clippy::never_loop)] + while let Some((_, value)) = self.iter.try_next()? { + let tuple = TableCodec::decode_tuple( + &self.table_types, + &self.projections, + &self.tuple_columns, + &value, + ); + + if let Some(num) = self.limit.as_mut() { + num.sub_assign(1); + } + + return Ok(Some(tuple)); + } + + Ok(None) + } +} + +pub struct IndexIter<'a, T: Transaction> { + offset: usize, + limit: Option, + + params: IndexImplParams<'a, T>, inner: IndexImplEnum, // for buffering data ranges: VecDeque, - scope_iter: Option>, + scope_iter: Option>, } -impl IndexIter<'_> { +impl IndexIter<'_, T> { fn offset_move(offset: &mut usize) -> bool { if *offset > 0 { offset.sub_assign(1); @@ -444,7 +862,7 @@ impl IndexIter<'_> { } /// expression -> index value -> tuple -impl Iter for IndexIter<'_> { +impl Iter for IndexIter<'_, T> { fn next_tuple(&mut self) -> Result, DatabaseError> { if matches!(self.limit, Some(0)) || self.is_empty() { self.scope_iter = None; @@ -454,16 +872,14 @@ impl Iter for IndexIter<'_> { } if let Some(iter) = &mut self.scope_iter { - while let Some((_, value_option)) = iter.try_next()? { - if let Some(bytes) = value_option { - if Self::offset_move(&mut self.offset) { - continue; - } - Self::limit_sub(&mut self.limit); - let tuple = self.inner.index_lookup(&bytes, &self.params)?; - - return Ok(Some(tuple)); + while let Some((_, bytes)) = iter.try_next()? { + if Self::offset_move(&mut self.offset) { + continue; } + Self::limit_sub(&mut self.limit); + let tuple = self.inner.index_lookup(&bytes, &self.params)?; + + return Ok(Some(tuple)); } self.scope_iter = None; } @@ -506,7 +922,7 @@ impl Iter for IndexIter<'_> { let mut encode_max = bound_encode(max, true)?; check_bound(&mut encode_max, bound_max); - let iter = self.params.tx.iter( + let iter = self.params.tx.range( encode_min.as_ref().map(Vec::as_slice), encode_max.as_ref().map(Vec::as_slice), )?; @@ -529,6 +945,10 @@ impl Iter for IndexIter<'_> { } } +pub trait InnerIter: Sync + Send { + fn try_next(&mut self) -> Result, DatabaseError>; +} + pub trait Iter: Sync + Send { fn next_tuple(&mut self) -> Result, DatabaseError>; } diff --git a/tests/sqllogictest/src/lib.rs b/tests/sqllogictest/src/lib.rs index a6b8c505..ccc367cd 100644 --- a/tests/sqllogictest/src/lib.rs +++ b/tests/sqllogictest/src/lib.rs @@ -1,6 +1,6 @@ use fnck_sql::db::Database; use fnck_sql::errors::DatabaseError; -use fnck_sql::storage::kip::KipStorage; +use fnck_sql::storage::kipdb::KipStorage; use sqllogictest::{AsyncDB, DBOutput, DefaultColumnType}; use std::time::Instant;