Skip to content

Commit

Permalink
test: add unit tests for Transaction (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould authored Sep 29, 2024
1 parent fc1cda4 commit 7409bd7
Show file tree
Hide file tree
Showing 16 changed files with 563 additions and 78 deletions.
2 changes: 1 addition & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let scala_functions = Default::default();
let table_functions = Default::default();

Expand Down
2 changes: 1 addition & 1 deletion src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ pub mod test {

pub fn select_sql_run<S: AsRef<str>>(sql: S) -> Result<LogicalPlan, DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
let transaction = storage.transaction()?;
let scala_functions = Default::default();
Expand Down
22 changes: 14 additions & 8 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl DataBaseBuilder {

pub fn build(self) -> Result<Database<RocksStorage>, DatabaseError> {
let storage = RocksStorage::new(self.path)?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(256, 8, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(48, 4, RandomState::new())?);

Ok(Database {
storage,
Expand Down Expand Up @@ -303,7 +303,7 @@ impl<S: Storage> DBTransaction<'_, S> {
}

#[cfg(test)]
mod test {
pub(crate) mod test {
use crate::catalog::{ColumnCatalog, ColumnDesc};
use crate::db::{DataBaseBuilder, DatabaseError};
use crate::storage::{Storage, TableCache, Transaction};
Expand All @@ -314,9 +314,9 @@ mod test {
use std::sync::Arc;
use tempfile::TempDir;

fn build_table(
pub(crate) fn build_table<T: Transaction>(
table_cache: &TableCache,
mut transaction: impl Transaction,
transaction: &mut T,
) -> Result<(), DatabaseError> {
let columns = vec![
ColumnCatalog::new(
Expand All @@ -329,10 +329,14 @@ mod test {
false,
ColumnDesc::new(LogicalType::Boolean, false, false, None),
),
ColumnCatalog::new(
"c3".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, false, false, None),
),
];
let _ =
transaction.create_table(table_cache, Arc::new("t1".to_string()), columns, false)?;
transaction.commit()?;

Ok(())
}
Expand All @@ -341,8 +345,10 @@ mod test {
fn test_run_sql() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let database = DataBaseBuilder::path(temp_dir.path()).build()?;
let transaction = database.storage.transaction()?;
build_table(&database.table_cache, transaction)?;
let mut transaction = database.storage.transaction()?;

build_table(&database.table_cache, &mut transaction)?;
transaction.commit()?;

let batch = database.run("select * from t1")?;

Expand Down
2 changes: 1 addition & 1 deletion src/execution/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
drop(coroutine);

for tuple in tuples {
throw!(transaction.append(table_name, tuple, &types, true));
throw!(transaction.append_tuple(table_name, tuple, &types, true));
}
let col_id =
throw!(transaction.add_column(cache.0, table_name, column, *if_not_exists));
Expand Down
2 changes: 1 addition & 1 deletion src/execution/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
}
drop(coroutine);
for tuple in tuples {
throw!(transaction.append(&table_name, tuple, &types, true));
throw!(transaction.append_tuple(&table_name, tuple, &types, true));
}
throw!(transaction.drop_column(cache.0, cache.1, &table_name, &column_name));

Expand Down
2 changes: 1 addition & 1 deletion src/execution/dml/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile {
let handle = thread::spawn(|| self.read_file_blocking(tx));
let mut size = 0_usize;
while let Ok(chunk) = rx.recv() {
throw!(transaction.append(&table_name, chunk, &types, false));
throw!(transaction.append_tuple(&table_name, chunk, &types, false));
size += 1;
}
throw!(handle.join().unwrap());
Expand Down
6 changes: 4 additions & 2 deletions src/execution/dml/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete {
);
}
}
tuple_ids.push(tuple.id.unwrap());
if let Some(tuple_id) = tuple.id {
tuple_ids.push(tuple_id);
}
}
drop(coroutine);
for (
Expand All @@ -95,7 +97,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete {
}
}
for tuple_id in tuple_ids {
throw!(transaction.delete(&table_name, tuple_id));
throw!(transaction.remove_tuple(&table_name, &tuple_id));
}
yield Ok(TupleBuilder::build_result("1".to_string()));
},
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
}
}
for tuple in tuples {
throw!(transaction.append(&table_name, tuple, &types, is_overwrite));
throw!(transaction.append_tuple(&table_name, tuple, &types, is_overwrite));
}
}
yield Ok(TupleBuilder::build_result("1".to_string()));
Expand Down
4 changes: 2 additions & 2 deletions src/execution/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
if column.desc.is_primary {
let old_key = tuple.id.replace(value.clone()).unwrap();

throw!(transaction.delete(&table_name, old_key));
throw!(transaction.remove_tuple(&table_name, &old_key));
is_overwrite = false;
}
tuple.values[i] = value.clone();
Expand All @@ -115,7 +115,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
));
}

throw!(transaction.append(&table_name, tuple, &types, is_overwrite));
throw!(transaction.append_tuple(&table_name, tuple, &types, is_overwrite));
}
}
yield Ok(TupleBuilder::build_result("1".to_string()));
Expand Down
4 changes: 2 additions & 2 deletions src/execution/dql/aggregate/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ mod test {

#[test]
fn test_hash_agg() -> Result<(), DatabaseError> {
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);

let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path()).unwrap();
Expand Down
16 changes: 8 additions & 8 deletions src/execution/dql/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right) = build_join_values();

let op = JoinOperator {
Expand Down Expand Up @@ -566,8 +566,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right) = build_join_values();

let op = JoinOperator {
Expand Down Expand Up @@ -645,8 +645,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right) = build_join_values();

let op = JoinOperator {
Expand Down Expand Up @@ -687,8 +687,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right) = build_join_values();

let op = JoinOperator {
Expand Down
36 changes: 18 additions & 18 deletions src/execution/dql/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand All @@ -564,8 +564,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand Down Expand Up @@ -604,8 +604,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand Down Expand Up @@ -633,8 +633,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, _) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand Down Expand Up @@ -665,8 +665,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, _) = build_join_values(false);
let op = JoinOperator {
on: JoinCondition::On {
Expand All @@ -689,8 +689,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand All @@ -716,8 +716,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand Down Expand Up @@ -745,8 +745,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand Down Expand Up @@ -779,8 +779,8 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let (keys, left, right, filter) = build_join_values(true);
let op = JoinOperator {
on: JoinCondition::On {
Expand Down
Loading

0 comments on commit 7409bd7

Please sign in to comment.