From 7409bd7783498d0db8a4c84a813078de122f898e Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sun, 29 Sep 2024 20:15:26 +0800 Subject: [PATCH] test: add unit tests for `Transaction` (#226) --- src/binder/create_table.rs | 2 +- src/binder/mod.rs | 2 +- src/db.rs | 22 +- src/execution/ddl/add_column.rs | 2 +- src/execution/ddl/drop_column.rs | 2 +- src/execution/dml/copy_from_file.rs | 2 +- src/execution/dml/delete.rs | 6 +- src/execution/dml/insert.rs | 2 +- src/execution/dml/update.rs | 4 +- src/execution/dql/aggregate/hash_agg.rs | 4 +- src/execution/dql/join/hash_join.rs | 16 +- src/execution/dql/join/nested_loop_join.rs | 36 +- src/storage/mod.rs | 501 ++++++++++++++++++++- src/storage/rocksdb.rs | 6 +- src/types/index.rs | 2 +- tests/slt/crdb/join.slt | 32 +- 16 files changed, 563 insertions(+), 78 deletions(-) diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 903dd88c..a2aeef83 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -157,7 +157,7 @@ mod tests { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let scala_functions = Default::default(); let table_functions = Default::default(); diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 9a5535c2..aa6a6ce1 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -445,7 +445,7 @@ pub mod test { pub fn select_sql_run>(sql: S) -> Result { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let storage = build_test_catalog(&table_cache, temp_dir.path())?; let transaction = storage.transaction()?; let scala_functions = Default::default(); diff --git a/src/db.rs b/src/db.rs index 729bcdeb..37ae4937 100644 --- a/src/db.rs +++ b/src/db.rs @@ -69,8 +69,8 @@ impl DataBaseBuilder { pub fn build(self) -> Result, DatabaseError> { let storage = RocksStorage::new(self.path)?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(256, 8, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(48, 4, RandomState::new())?); Ok(Database { storage, @@ -303,7 +303,7 @@ impl DBTransaction<'_, S> { } #[cfg(test)] -mod test { +pub(crate) mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; use crate::db::{DataBaseBuilder, DatabaseError}; use crate::storage::{Storage, TableCache, Transaction}; @@ -314,9 +314,9 @@ mod test { use std::sync::Arc; use tempfile::TempDir; - fn build_table( + pub(crate) fn build_table( table_cache: &TableCache, - mut transaction: impl Transaction, + transaction: &mut T, ) -> Result<(), DatabaseError> { let columns = vec![ ColumnCatalog::new( @@ -329,10 +329,14 @@ mod test { false, ColumnDesc::new(LogicalType::Boolean, false, false, None), ), + ColumnCatalog::new( + "c3".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, false, false, None), + ), ]; let _ = transaction.create_table(table_cache, Arc::new("t1".to_string()), columns, false)?; - transaction.commit()?; Ok(()) } @@ -341,8 +345,10 @@ mod test { fn test_run_sql() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let database = DataBaseBuilder::path(temp_dir.path()).build()?; - let transaction = database.storage.transaction()?; - build_table(&database.table_cache, transaction)?; + let mut transaction = database.storage.transaction()?; + + build_table(&database.table_cache, &mut transaction)?; + transaction.commit()?; let batch = database.run("select * from t1")?; diff --git a/src/execution/ddl/add_column.rs b/src/execution/ddl/add_column.rs index ccfe0b63..863d2efc 100644 --- a/src/execution/ddl/add_column.rs +++ b/src/execution/ddl/add_column.rs @@ -68,7 +68,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { drop(coroutine); for tuple in tuples { - throw!(transaction.append(table_name, tuple, &types, true)); + throw!(transaction.append_tuple(table_name, tuple, &types, true)); } let col_id = throw!(transaction.add_column(cache.0, table_name, column, *if_not_exists)); diff --git a/src/execution/ddl/drop_column.rs b/src/execution/ddl/drop_column.rs index 216493f3..73802989 100644 --- a/src/execution/ddl/drop_column.rs +++ b/src/execution/ddl/drop_column.rs @@ -67,7 +67,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { } drop(coroutine); for tuple in tuples { - throw!(transaction.append(&table_name, tuple, &types, true)); + throw!(transaction.append_tuple(&table_name, tuple, &types, true)); } throw!(transaction.drop_column(cache.0, cache.1, &table_name, &column_name)); diff --git a/src/execution/dml/copy_from_file.rs b/src/execution/dml/copy_from_file.rs index 0c5b5949..171a9413 100644 --- a/src/execution/dml/copy_from_file.rs +++ b/src/execution/dml/copy_from_file.rs @@ -42,7 +42,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile { let handle = thread::spawn(|| self.read_file_blocking(tx)); let mut size = 0_usize; while let Ok(chunk) = rx.recv() { - throw!(transaction.append(&table_name, chunk, &types, false)); + throw!(transaction.append_tuple(&table_name, chunk, &types, false)); size += 1; } throw!(handle.join().unwrap()); diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index 3dfd6458..9eaa9c62 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -74,7 +74,9 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { ); } } - tuple_ids.push(tuple.id.unwrap()); + if let Some(tuple_id) = tuple.id { + tuple_ids.push(tuple_id); + } } drop(coroutine); for ( @@ -95,7 +97,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { } } for tuple_id in tuple_ids { - throw!(transaction.delete(&table_name, tuple_id)); + throw!(transaction.remove_tuple(&table_name, &tuple_id)); } yield Ok(TupleBuilder::build_result("1".to_string())); }, diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 8e7507da..fbb32d9a 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -118,7 +118,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { } } for tuple in tuples { - throw!(transaction.append(&table_name, tuple, &types, is_overwrite)); + throw!(transaction.append_tuple(&table_name, tuple, &types, is_overwrite)); } } yield Ok(TupleBuilder::build_result("1".to_string())); diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index caaca0f5..bb26690f 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -98,7 +98,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { if column.desc.is_primary { let old_key = tuple.id.replace(value.clone()).unwrap(); - throw!(transaction.delete(&table_name, old_key)); + throw!(transaction.remove_tuple(&table_name, &old_key)); is_overwrite = false; } tuple.values[i] = value.clone(); @@ -115,7 +115,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { )); } - throw!(transaction.append(&table_name, tuple, &types, is_overwrite)); + throw!(transaction.append_tuple(&table_name, tuple, &types, is_overwrite)); } } yield Ok(TupleBuilder::build_result("1".to_string())); diff --git a/src/execution/dql/aggregate/hash_agg.rs b/src/execution/dql/aggregate/hash_agg.rs index f3810b92..c75f8199 100644 --- a/src/execution/dql/aggregate/hash_agg.rs +++ b/src/execution/dql/aggregate/hash_agg.rs @@ -183,8 +183,8 @@ mod test { #[test] fn test_hash_agg() -> Result<(), DatabaseError> { - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path()).unwrap(); diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index be8d4cd1..08e51b37 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -528,8 +528,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -566,8 +566,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -645,8 +645,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -687,8 +687,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index dae1e90f..6f11a010 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -536,8 +536,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -564,8 +564,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -604,8 +604,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -633,8 +633,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, _) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -665,8 +665,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, _) = build_join_values(false); let op = JoinOperator { on: JoinCondition::On { @@ -689,8 +689,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -716,8 +716,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -745,8 +745,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -779,8 +779,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 097ab10e..fdb1a860 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -181,7 +181,7 @@ pub trait Transaction: Sized { Ok(()) } - fn append( + fn append_tuple( &mut self, table_name: &str, tuple: Tuple, @@ -198,8 +198,8 @@ pub trait Transaction: Sized { Ok(()) } - fn delete(&mut self, table_name: &str, tuple_id: TupleId) -> Result<(), DatabaseError> { - let key = TableCodec::encode_tuple_key(table_name, &tuple_id)?; + fn remove_tuple(&mut self, table_name: &str, tuple_id: &TupleId) -> Result<(), DatabaseError> { + let key = TableCodec::encode_tuple_key(table_name, tuple_id)?; self.remove(&key)?; Ok(()) @@ -302,7 +302,7 @@ pub trait Transaction: Sized { } return Err(DatabaseError::TableExists); } - self.create_index_meta_for_table(&mut table_catalog)?; + self.create_index_meta_from_column(&mut table_catalog)?; self.set(table_key, value)?; for column in table_catalog.columns() { @@ -335,9 +335,6 @@ pub trait Transaction: Sized { let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name.as_str()); self._drop_data(&index_meta_min, &index_meta_max)?; - let (statistics_min, statistics_max) = TableCodec::statistics_bound(table_name.as_str()); - self._drop_data(&statistics_min, &statistics_max)?; - self.remove(&TableCodec::encode_root_table_key(table_name.as_str()))?; table_cache.remove(&table_name); @@ -351,6 +348,9 @@ pub trait Transaction: Sized { let (index_min, index_max) = TableCodec::all_index_bound(table_name); self._drop_data(&index_min, &index_max)?; + let (statistics_min, statistics_max) = TableCodec::statistics_bound(table_name); + self._drop_data(&statistics_min, &statistics_max)?; + Ok(()) } @@ -469,7 +469,7 @@ pub trait Transaction: Sized { Ok(()) } - fn create_index_meta_for_table( + fn create_index_meta_from_column( &mut self, table: &mut TableCatalog, ) -> Result<(), DatabaseError> { @@ -978,3 +978,488 @@ pub trait InnerIter { pub trait Iter { fn next_tuple(&mut self) -> Result, DatabaseError>; } + +#[cfg(test)] +mod test { + use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnSummary, TableCatalog}; + use crate::db::test::build_table; + use crate::errors::DatabaseError; + use crate::expression::range_detacher::Range; + use crate::storage::rocksdb::{RocksStorage, RocksTransaction}; + use crate::storage::table_codec::TableCodec; + use crate::storage::{ + IndexIter, InnerIter, Iter, StatisticsMetaCache, Storage, TableCache, Transaction, + }; + use crate::types::index::{Index, IndexMeta, IndexType}; + use crate::types::tuple::Tuple; + use crate::types::value::DataValue; + use crate::types::LogicalType; + use crate::utils::lru::ShardingLruCache; + use std::collections::Bound; + use std::hash::RandomState; + use std::slice; + use std::sync::Arc; + use tempfile::TempDir; + + fn full_columns() -> Vec<(usize, ColumnRef)> { + vec![ + ( + 0, + Arc::new(ColumnCatalog::new( + "c1".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, true, false, None), + )), + ), + ( + 1, + Arc::new(ColumnCatalog::new( + "c2".to_string(), + false, + ColumnDesc::new(LogicalType::Boolean, false, false, None), + )), + ), + ( + 2, + Arc::new(ColumnCatalog::new( + "c3".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, false, false, None), + )), + ), + ] + } + fn build_tuples() -> Vec { + vec![ + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(0)))), + values: vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Boolean(Some(true))), + Arc::new(DataValue::Int32(Some(0))), + ], + }, + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(1)))), + values: vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Boolean(Some(true))), + Arc::new(DataValue::Int32(Some(1))), + ], + }, + Tuple { + id: Some(Arc::new(DataValue::Int32(Some(2)))), + values: vec![ + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Boolean(Some(false))), + Arc::new(DataValue::Int32(Some(0))), + ], + }, + ] + } + + #[test] + fn test_table_create_drop() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = RocksStorage::new(temp_dir.path())?; + let mut transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + + build_table(&table_cache, &mut transaction)?; + + let fn_assert = |transaction: &mut RocksTransaction, + table_cache: &TableCache| + -> Result<(), DatabaseError> { + let table = transaction + .table(&table_cache, Arc::new("t1".to_string())) + .unwrap(); + assert_eq!(table.name.as_str(), "t1"); + assert_eq!(table.indexes.len(), 1); + + let primary_key_index_meta = &table.indexes[0]; + assert_eq!(primary_key_index_meta.id, 0); + assert_eq!(primary_key_index_meta.column_ids, vec![0]); + assert_eq!( + primary_key_index_meta.table_name, + Arc::new("t1".to_string()) + ); + assert_eq!(primary_key_index_meta.pk_ty, LogicalType::Integer); + assert_eq!(primary_key_index_meta.name, "pk_c1".to_string()); + assert_eq!(primary_key_index_meta.ty, IndexType::PrimaryKey); + + let mut column_iter = table.columns(); + let c1_column = column_iter.next().unwrap(); + assert_eq!(c1_column.nullable, false); + assert_eq!( + c1_column.summary(), + &ColumnSummary { + id: Some(0), + name: "c1".to_string(), + table_name: Some(Arc::new("t1".to_string())), + } + ); + assert_eq!( + c1_column.desc, + ColumnDesc::new(LogicalType::Integer, true, false, None) + ); + + let c2_column = column_iter.next().unwrap(); + assert_eq!(c2_column.nullable, false); + assert_eq!( + c2_column.summary(), + &ColumnSummary { + id: Some(1), + name: "c2".to_string(), + table_name: Some(Arc::new("t1".to_string())), + } + ); + assert_eq!( + c2_column.desc, + ColumnDesc::new(LogicalType::Boolean, false, false, None) + ); + + let c3_column = column_iter.next().unwrap(); + assert_eq!(c3_column.nullable, false); + assert_eq!( + c3_column.summary(), + &ColumnSummary { + id: Some(2), + name: "c3".to_string(), + table_name: Some(Arc::new("t1".to_string())), + } + ); + assert_eq!( + c3_column.desc, + ColumnDesc::new(LogicalType::Integer, false, false, None) + ); + + Ok(()) + }; + fn_assert(&mut transaction, &table_cache)?; + fn_assert( + &mut transaction, + &Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?), + )?; + + Ok(()) + } + + #[test] + fn test_tuple_append_delete() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = RocksStorage::new(temp_dir.path())?; + let mut transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + + assert!(transaction + .add_index_meta( + &table_cache, + &Arc::new("t1".to_string()), + "i1".to_string(), + vec![2], + IndexType::Normal + ) + .is_err()); + + build_table(&table_cache, &mut transaction)?; + + let tuples = build_tuples(); + for tuple in tuples.iter().cloned() { + transaction.append_tuple( + "t1", + tuple, + &[ + LogicalType::Integer, + LogicalType::Boolean, + LogicalType::Integer, + ], + false, + )?; + } + { + let mut tuple_iter = transaction.read( + &table_cache, + Arc::new("t1".to_string()), + (None, None), + full_columns(), + )?; + + assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[0]); + assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[1]); + assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[2]); + + let (min, max) = TableCodec::tuple_bound("t1"); + let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + assert!(iter.try_next()?.is_none()); + } + + transaction.remove_tuple("t1", &tuples[1].values[0])?; + { + let mut tuple_iter = transaction.read( + &table_cache, + Arc::new("t1".to_string()), + (None, None), + full_columns(), + )?; + + assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[0]); + assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[2]); + + let (min, max) = TableCodec::tuple_bound("t1"); + let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + assert!(iter.try_next()?.is_none()); + } + + Ok(()) + } + + #[test] + fn test_add_index_meta() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = RocksStorage::new(temp_dir.path())?; + let mut transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + + assert!(transaction + .add_index_meta( + &table_cache, + &Arc::new("t1".to_string()), + "i1".to_string(), + vec![2], + IndexType::Normal + ) + .is_err()); + + build_table(&table_cache, &mut transaction)?; + + let _ = transaction.add_index_meta( + &table_cache, + &Arc::new("t1".to_string()), + "i1".to_string(), + vec![2], + IndexType::Normal, + )?; + let _ = transaction.add_index_meta( + &table_cache, + &Arc::new("t1".to_string()), + "i2".to_string(), + vec![2, 1], + IndexType::Composite, + )?; + + let fn_assert = |transaction: &mut RocksTransaction, + table_cache: &TableCache| + -> Result<(), DatabaseError> { + let table = transaction + .table(&table_cache, Arc::new("t1".to_string())) + .unwrap(); + + let i1_meta = table.indexes[1].clone(); + assert_eq!(i1_meta.id, 1); + assert_eq!(i1_meta.column_ids, vec![2]); + assert_eq!(i1_meta.table_name, Arc::new("t1".to_string())); + assert_eq!(i1_meta.pk_ty, LogicalType::Integer); + assert_eq!(i1_meta.name, "i1".to_string()); + assert_eq!(i1_meta.ty, IndexType::Normal); + + let i2_meta = table.indexes[2].clone(); + assert_eq!(i2_meta.id, 2); + assert_eq!(i2_meta.column_ids, vec![2, 1]); + assert_eq!(i2_meta.table_name, Arc::new("t1".to_string())); + assert_eq!(i2_meta.pk_ty, LogicalType::Integer); + assert_eq!(i2_meta.name, "i2".to_string()); + assert_eq!(i2_meta.ty, IndexType::Composite); + + Ok(()) + }; + fn_assert(&mut transaction, &table_cache)?; + fn_assert( + &mut transaction, + &Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?), + )?; + { + let (min, max) = TableCodec::index_meta_bound("t1"); + let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + assert!(iter.try_next()?.is_none()); + } + + Ok(()) + } + + #[test] + fn test_index_insert_delete() -> Result<(), DatabaseError> { + fn build_index_iter<'a>( + transaction: &'a RocksTransaction<'a>, + table_cache: &'a Arc>, + ) -> Result>, DatabaseError> { + transaction.read_by_index( + &table_cache, + Arc::new("t1".to_string()), + (None, None), + full_columns(), + Arc::new(IndexMeta { + id: 1, + column_ids: vec![2], + table_name: Arc::new("t1".to_string()), + pk_ty: LogicalType::Integer, + name: "i1".to_string(), + ty: IndexType::Normal, + }), + vec![Range::Scope { + min: Bound::Unbounded, + max: Bound::Unbounded, + }], + ) + } + + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = RocksStorage::new(temp_dir.path())?; + let mut transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + + build_table(&table_cache, &mut transaction)?; + + let _ = transaction.add_index_meta( + &table_cache, + &Arc::new("t1".to_string()), + "i1".to_string(), + vec![2], + IndexType::Normal, + )?; + + let tuples = build_tuples(); + let indexes = vec![ + ( + Arc::new(DataValue::Int32(Some(0))), + Index::new(1, slice::from_ref(&tuples[0].values[2]), IndexType::Normal), + ), + ( + Arc::new(DataValue::Int32(Some(1))), + Index::new(1, slice::from_ref(&tuples[1].values[2]), IndexType::Normal), + ), + ( + Arc::new(DataValue::Int32(Some(2))), + Index::new(1, slice::from_ref(&tuples[2].values[2]), IndexType::Normal), + ), + ]; + for (tuple_id, index) in indexes.iter().cloned() { + transaction.add_index("t1", index, &tuple_id)?; + } + for tuple in tuples.iter().cloned() { + transaction.append_tuple( + "t1", + tuple, + &[ + LogicalType::Integer, + LogicalType::Boolean, + LogicalType::Integer, + ], + false, + )?; + } + { + let mut index_iter = build_index_iter(&transaction, &table_cache)?; + + assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[0]); + assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[2]); + assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[1]); + + let (min, max) = TableCodec::index_bound("t1", &1); + let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + assert!(iter.try_next()?.is_none()); + } + transaction.del_index("t1", &indexes[0].1, Some(&indexes[0].0))?; + + let mut index_iter = build_index_iter(&transaction, &table_cache)?; + + assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[2]); + assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[1]); + + let (min, max) = TableCodec::index_bound("t1", &1); + let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + assert!(iter.try_next()?.is_none()); + + Ok(()) + } + + #[test] + fn test_column_add_drop() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = RocksStorage::new(temp_dir.path())?; + let mut transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let meta_cache = StatisticsMetaCache::new(4, 1, RandomState::new())?; + + build_table(&table_cache, &mut transaction)?; + let table_name = Arc::new("t1".to_string()); + + let new_column = ColumnCatalog::new( + "c4".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, false, false, None), + ); + let new_column_id = + transaction.add_column(&table_cache, &table_name, &new_column, false)?; + { + assert!(transaction + .add_column(&table_cache, &table_name, &new_column, false,) + .is_err()); + assert_eq!( + new_column_id, + transaction.add_column(&table_cache, &table_name, &new_column, true,)? + ); + } + { + let table = transaction.table(&table_cache, table_name.clone()).unwrap(); + assert!(table.contains_column("c4")); + + let mut new_column = ColumnCatalog::new( + "c4".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, false, false, None), + ); + new_column.set_table_name(table_name.clone()); + new_column.set_id(3); + assert_eq!(table.get_column_by_name("c4"), Some(&Arc::new(new_column))); + } + transaction.drop_column(&table_cache, &meta_cache, &table_name, "c4")?; + { + let table = transaction.table(&table_cache, table_name.clone()).unwrap(); + assert!(!table.contains_column("c4")); + assert!(table.get_column_by_name("c4").is_none()); + } + + Ok(()) + } +} diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 0568887b..0de513f8 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -153,7 +153,7 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let mut transaction = storage.transaction()?; - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let columns = Arc::new(vec![ Arc::new(ColumnCatalog::new( "c1".to_string(), @@ -185,7 +185,7 @@ mod test { .get_column_id_by_name(&"c1".to_string()) .is_some()); - transaction.append( + transaction.append_tuple( &"test".to_string(), Tuple { id: Some(Arc::new(DataValue::Int32(Some(1)))), @@ -197,7 +197,7 @@ mod test { &[LogicalType::Integer, LogicalType::Boolean], false, )?; - transaction.append( + transaction.append_tuple( &"test".to_string(), Tuple { id: Some(Arc::new(DataValue::Int32(Some(2)))), diff --git a/src/types/index.rs b/src/types/index.rs index 72e44bb7..d1c6db90 100644 --- a/src/types/index.rs +++ b/src/types/index.rs @@ -57,7 +57,7 @@ impl IndexMeta { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Index<'a> { pub id: IndexId, pub column_values: &'a [ValueRef], diff --git a/tests/slt/crdb/join.slt b/tests/slt/crdb/join.slt index 9d7acc08..1eb5acc8 100644 --- a/tests/slt/crdb/join.slt +++ b/tests/slt/crdb/join.slt @@ -167,15 +167,12 @@ null 16 null 42 null 43 -# TODO: Full Join on nested loop join # query # SELECT * FROM onecolumn AS a FULL OUTER JOIN othercolumn AS b USING(x) ORDER BY x -# TODO: Full Join on nested loop join # query # SELECT x AS s, a.x, b.x FROM onecolumn AS a FULL OUTER JOIN othercolumn AS b USING(x) ORDER BY s -# TODO: Full Join on nested loop join # query # SELECT * FROM onecolumn AS a NATURAL FULL OUTER JOIN othercolumn AS b ORDER BY x @@ -244,27 +241,23 @@ null null 1 null statement ok SELECT * FROM empty AS a FULL OUTER JOIN onecolumn AS b USING(x) ORDER BY x -# TODO: Full Join on nested loop join -# query II -# SELECT * FROM onecolumn AS a(x) FULL OUTER JOIN empty AS b(y) ON a.x = b.y ORDER BY a.x -# ---- -# 42 NULL -# 44 NULL -# NULL NULL +query II +SELECT * FROM onecolumn AS a(aid, x) FULL OUTER JOIN empty AS b(bid, y) ON a.x = b.y ORDER BY a.x +---- +null null 2 42 +null null 0 44 +null null 1 null -# TODO: Full Join on nested loop join # query # SELECT * FROM onecolumn AS a FULL OUTER JOIN empty AS b USING(x) ORDER BY x -# TODO: Full Join on nested loop join -# query II -# SELECT * FROM empty AS a(x) FULL OUTER JOIN onecolumn AS b(y) ON a.x = b.y ORDER BY b.y -# ---- -# NULL 42 -# NULL 44 -# NULL NULL +query II +SELECT * FROM empty AS a(aid, x) FULL OUTER JOIN onecolumn AS b(bid, y) ON a.x = b.y ORDER BY b.y +---- +null null 2 42 +null null 0 44 +null null 1 null -# TODO: Full Join on nested loop join # query # SELECT * FROM empty AS a FULL OUTER JOIN onecolumn AS b USING(x) ORDER BY x @@ -317,7 +310,6 @@ SELECT o.x, t.y FROM onecolumn o LEFT OUTER JOIN twocolumn t ON (o.x=t.x AND t.x 44 51 null null -# TODO: Full Join on nested loop join # query # SELECT * FROM (SELECT x, 2 two FROM onecolumn) NATURAL FULL JOIN (SELECT x, y+1 plus1 FROM twocolumn)