From ed06a350c15176c553b5026cd9b1ce94680e7f2c Mon Sep 17 00:00:00 2001 From: Kould Date: Sat, 17 Aug 2024 20:54:26 +0800 Subject: [PATCH] perf: strip `table_cache` and `meta_cache` out of storage & optimize `RocksIter` --- Cargo.toml | 2 +- benchmarks/query_benchmark.rs | 2 +- src/binder/create_table.rs | 10 ++- src/binder/expr.rs | 3 +- src/binder/mod.rs | 30 ++++++--- src/binder/select.rs | 3 +- src/db.rs | 75 +++++++++++++++------ src/execution/ddl/add_column.rs | 14 ++-- src/execution/ddl/create_index.rs | 37 ++++++---- src/execution/ddl/create_table.rs | 16 +++-- src/execution/ddl/drop_column.rs | 12 ++-- src/execution/ddl/drop_table.rs | 10 ++- src/execution/ddl/truncate.rs | 8 ++- src/execution/dml/analyze.rs | 14 ++-- src/execution/dml/copy_from_file.rs | 11 ++- src/execution/dml/delete.rs | 12 ++-- src/execution/dml/insert.rs | 13 ++-- src/execution/dml/update.rs | 15 +++-- src/execution/dql/aggregate/hash_agg.rs | 22 ++++-- src/execution/dql/aggregate/simple_agg.rs | 10 ++- src/execution/dql/describe.rs | 10 ++- src/execution/dql/dummy.rs | 4 +- src/execution/dql/explain.rs | 4 +- src/execution/dql/filter.rs | 10 ++- src/execution/dql/index_scan.rs | 17 ++++- src/execution/dql/join/hash_join.rs | 38 ++++++++--- src/execution/dql/join/nested_loop_join.rs | 54 +++++++++++---- src/execution/dql/limit.rs | 10 ++- src/execution/dql/projection.rs | 10 ++- src/execution/dql/seq_scan.rs | 12 +++- src/execution/dql/show_table.rs | 8 ++- src/execution/dql/sort.rs | 10 ++- src/execution/dql/union.rs | 12 ++-- src/execution/dql/values.rs | 4 +- src/execution/mod.rs | 78 +++++++++++++--------- src/optimizer/core/memo.rs | 13 +++- src/storage/mod.rs | 71 +++++++++++--------- src/storage/rocksdb.rs | 62 ++++++++--------- src/utils/bit_vector.rs | 5 ++ src/utils/lru.rs | 4 ++ 40 files changed, 509 insertions(+), 246 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1d806ac6..c557485d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "fnck_sql" -version = "0.0.2" +version = "0.0.3" edition = "2021" authors = ["Kould ", "Xwg "] description = "SQL as a Function for Rust" diff --git a/benchmarks/query_benchmark.rs b/benchmarks/query_benchmark.rs index 2aadd162..2a943fc8 100644 --- a/benchmarks/query_benchmark.rs +++ b/benchmarks/query_benchmark.rs @@ -11,7 +11,7 @@ use std::path::Path; const QUERY_BENCH_FNCK_SQL_PATH: &'static str = "./fncksql_bench"; const QUERY_BENCH_SQLITE_PATH: &'static str = "./sqlite_bench"; -const TABLE_ROW_NUM: u64 = 2_00_000; +const TABLE_ROW_NUM: u64 = 200_000; fn query_cases() -> Vec<(&'static str, &'static str)> { vec![ diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index f7060304..17de1010 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -146,7 +146,9 @@ mod tests { use crate::storage::rocksdb::RocksStorage; use crate::storage::Storage; use crate::types::LogicalType; + use crate::utils::lru::ShardingLruCache; use sqlparser::ast::CharLengthUnits; + use std::hash::RandomState; use std::sync::atomic::AtomicUsize; use tempfile::TempDir; @@ -155,11 +157,17 @@ mod tests { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let functions = Default::default(); let sql = "create table t1 (id int primary key, name varchar(10) null)"; let mut binder = Binder::new( - BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))), + BinderContext::new( + &table_cache, + &transaction, + &functions, + Arc::new(AtomicUsize::new(0)), + ), None, ); let stmt = crate::parser::parse_sql(sql).unwrap(); diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 0a3d3609..9d35ba3d 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -248,13 +248,14 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { subquery: &Query, ) -> Result<(LogicalPlan, Arc), DatabaseError> { let BinderContext { + table_cache, transaction, functions, temp_table_id, .. } = &self.context; let mut binder = Binder::new( - BinderContext::new(*transaction, functions, temp_table_id.clone()), + BinderContext::new(table_cache, *transaction, functions, temp_table_id.clone()), Some(self), ); let mut sub_query = binder.bind_query(subquery)?; diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 205acc3c..becf09b9 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -27,7 +27,7 @@ use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::join::JoinType; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{TableCache, Transaction}; pub enum InputRefType { AggCall, @@ -83,6 +83,7 @@ pub enum SubQueryType { #[derive(Clone)] pub struct BinderContext<'a, T: Transaction> { pub(crate) functions: &'a Functions, + pub(crate) table_cache: &'a TableCache, pub(crate) transaction: &'a T, // Tips: When there are multiple tables and Wildcard, use BTreeMap to ensure that the order of the output tables is certain. pub(crate) bind_table: @@ -105,12 +106,14 @@ pub struct BinderContext<'a, T: Transaction> { impl<'a, T: Transaction> BinderContext<'a, T> { pub fn new( + table_cache: &'a TableCache, transaction: &'a T, functions: &'a Functions, temp_table_id: Arc, ) -> Self { BinderContext { functions, + table_cache, transaction, bind_table: Default::default(), expr_aliases: Default::default(), @@ -157,9 +160,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> { pub fn table(&self, table_name: TableName) -> Option<&TableCatalog> { if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) { - self.transaction.table(real_name.clone()) + self.transaction.table(self.table_cache, real_name.clone()) } else { - self.transaction.table(table_name) + self.transaction.table(self.table_cache, table_name) } } @@ -170,9 +173,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> { join_type: Option, ) -> Result<&TableCatalog, DatabaseError> { let table = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) { - self.transaction.table(real_name.clone()) + self.transaction.table(self.table_cache, real_name.clone()) } else { - self.transaction.table(table_name.clone()) + self.transaction.table(self.table_cache, table_name.clone()) } .ok_or(DatabaseError::TableNotFound)?; @@ -380,20 +383,24 @@ pub mod test { use crate::errors::DatabaseError; use crate::planner::LogicalPlan; use crate::storage::rocksdb::RocksStorage; - use crate::storage::{Storage, Transaction}; + use crate::storage::{Storage, TableCache, Transaction}; use crate::types::LogicalType::Integer; + use crate::utils::lru::ShardingLruCache; + use std::hash::RandomState; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tempfile::TempDir; pub(crate) fn build_test_catalog( + table_cache: &TableCache, path: impl Into + Send, ) -> Result { let storage = RocksStorage::new(path)?; let mut transaction = storage.transaction()?; let _ = transaction.create_table( + table_cache, Arc::new("t1".to_string()), vec![ ColumnCatalog::new( @@ -411,6 +418,7 @@ pub mod test { )?; let _ = transaction.create_table( + table_cache, Arc::new("t2".to_string()), vec![ ColumnCatalog::new( @@ -434,11 +442,17 @@ pub mod test { pub fn select_sql_run>(sql: S) -> Result { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - let storage = build_test_catalog(temp_dir.path())?; + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let storage = build_test_catalog(&table_cache, temp_dir.path())?; let transaction = storage.transaction()?; let functions = Default::default(); let mut binder = Binder::new( - BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))), + BinderContext::new( + &table_cache, + &transaction, + &functions, + Arc::new(AtomicUsize::new(0)), + ), None, ); let stmt = crate::parser::parse_sql(sql)?; diff --git a/src/binder/select.rs b/src/binder/select.rs index 94ab2300..fe703978 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -494,13 +494,14 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { _ => unimplemented!(), }; let BinderContext { + table_cache, transaction, functions, temp_table_id, .. } = &self.context; let mut binder = Binder::new( - BinderContext::new(*transaction, functions, temp_table_id.clone()), + BinderContext::new(table_cache, *transaction, functions, temp_table_id.clone()), Some(self), ); let mut right = binder.bind_single_table_ref(relation, Some(join_type))?; diff --git a/src/db.rs b/src/db.rs index d6f8df8c..d38986ce 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,4 +1,5 @@ use crate::binder::{command_type, Binder, BinderContext, CommandType}; +use crate::catalog::TableCatalog; use crate::errors::DatabaseError; use crate::execution::{build_write, try_collect}; use crate::expression::function::{FunctionSummary, ScalarFunctionImpl}; @@ -9,12 +10,14 @@ use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::parser::parse_sql; use crate::planner::LogicalPlan; use crate::storage::rocksdb::RocksStorage; -use crate::storage::{Storage, Transaction}; +use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction}; use crate::types::tuple::{SchemaRef, Tuple}; use crate::udf::current_date::CurrentDate; +use crate::utils::lru::ShardingLruCache; use ahash::HashMap; use parking_lot::{ArcRwLockReadGuard, ArcRwLockWriteGuard, RawRwLock, RwLock}; use sqlparser::ast::Statement; +use std::hash::RandomState; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -51,19 +54,25 @@ impl DataBaseBuilder { self = self.register_function(CurrentDate::new()); let storage = RocksStorage::new(self.path)?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); Ok(Database { storage, functions: Arc::new(self.functions), mdl: Arc::new(RwLock::new(())), + meta_cache, + table_cache, }) } } pub struct Database { - pub storage: S, + pub(crate) storage: S, functions: Arc, mdl: Arc>, + pub(crate) meta_cache: Arc, + pub(crate) table_cache: Arc>, } impl Database { @@ -80,18 +89,21 @@ impl Database { } else { MetaDataLock::Read(self.mdl.read_arc()) }; - let transaction = self.storage.transaction()?; - let plan = Self::build_plan(stmt, &transaction, &self.functions)?; - - Self::run_volcano(transaction, plan) - } + let mut transaction = self.storage.transaction()?; + let mut plan = Self::build_plan( + stmt, + &self.table_cache, + &self.meta_cache, + &transaction, + &self.functions, + )?; - pub(crate) fn run_volcano( - mut transaction: ::TransactionType<'_>, - mut plan: LogicalPlan, - ) -> Result<(SchemaRef, Vec), DatabaseError> { let schema = plan.output_schema().clone(); - let iterator = build_write(plan, &mut transaction); + let iterator = build_write( + plan, + (&self.table_cache, &self.meta_cache), + &mut transaction, + ); let tuples = try_collect(iterator)?; transaction.commit()?; @@ -107,16 +119,25 @@ impl Database { inner: transaction, functions: self.functions.clone(), _guard: guard, + meta_cache: self.meta_cache.clone(), + table_cache: self.table_cache.clone(), }) } pub(crate) fn build_plan( stmt: &Statement, + table_cache: &TableCache, + meta_cache: &StatisticsMetaCache, transaction: &::TransactionType<'_>, functions: &Functions, ) -> Result { let mut binder = Binder::new( - BinderContext::new(transaction, functions, Arc::new(AtomicUsize::new(0))), + BinderContext::new( + table_cache, + transaction, + functions, + Arc::new(AtomicUsize::new(0)), + ), None, ); /// Build a logical plan. @@ -129,8 +150,8 @@ impl Database { let source_plan = binder.bind(stmt)?; // println!("source_plan plan: {:#?}", source_plan); - let best_plan = - Self::default_optimizer(source_plan).find_best(Some(&transaction.meta_loader()))?; + let best_plan = Self::default_optimizer(source_plan) + .find_best(Some(&transaction.meta_loader(meta_cache)))?; // println!("best_plan plan: {:#?}", best_plan); Ok(best_plan) @@ -221,6 +242,8 @@ pub struct DBTransaction<'a, S: Storage + 'a> { inner: S::TransactionType<'a>, functions: Arc, _guard: ArcRwLockReadGuard, + pub(crate) meta_cache: Arc, + pub(crate) table_cache: Arc>, } impl DBTransaction<'_, S> { @@ -235,10 +258,16 @@ impl DBTransaction<'_, S> { "`DDL` is not allowed to execute within a transaction".to_string(), )); } - let mut plan = Database::::build_plan(stmt, &self.inner, &self.functions)?; + let mut plan = Database::::build_plan( + stmt, + &self.table_cache, + &self.meta_cache, + &self.inner, + &self.functions, + )?; let schema = plan.output_schema().clone(); - let executor = build_write(plan, &mut self.inner); + let executor = build_write(plan, (&self.table_cache, &self.meta_cache), &mut self.inner); Ok((schema, try_collect(executor)?)) } @@ -258,7 +287,7 @@ mod test { use crate::expression::ScalarExpression; use crate::expression::{BinaryOperator, UnaryOperator}; use crate::function; - use crate::storage::{Storage, Transaction}; + use crate::storage::{Storage, TableCache, Transaction}; use crate::types::evaluator::EvaluatorFactory; use crate::types::tuple::{create_table, Tuple}; use crate::types::value::{DataValue, ValueRef}; @@ -268,7 +297,10 @@ mod test { use std::sync::Arc; use tempfile::TempDir; - fn build_table(mut transaction: impl Transaction) -> Result<(), DatabaseError> { + fn build_table( + table_cache: &TableCache, + mut transaction: impl Transaction, + ) -> Result<(), DatabaseError> { let columns = vec![ ColumnCatalog::new( "c1".to_string(), @@ -281,7 +313,8 @@ mod test { ColumnDesc::new(LogicalType::Boolean, false, false, None), ), ]; - let _ = transaction.create_table(Arc::new("t1".to_string()), columns, false)?; + let _ = + transaction.create_table(table_cache, Arc::new("t1".to_string()), columns, false)?; transaction.commit()?; Ok(()) @@ -292,7 +325,7 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let database = DataBaseBuilder::path(temp_dir.path()).build()?; let transaction = database.storage.transaction()?; - build_table(transaction)?; + build_table(&database.table_cache, transaction)?; let batch = database.run("select * from t1")?; diff --git a/src/execution/ddl/add_column.rs b/src/execution/ddl/add_column.rs index 0fe6d6c2..ccfe0b63 100644 --- a/src/execution/ddl/add_column.rs +++ b/src/execution/ddl/add_column.rs @@ -1,5 +1,6 @@ use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::LogicalPlan; +use crate::storage::{StatisticsMetaCache, TableCache}; use crate::types::index::{Index, IndexType}; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; @@ -25,7 +26,11 @@ impl From<(AddColumnOperator, LogicalPlan)> for AddColumn { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { - fn execute_mut(mut self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + mut self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -45,7 +50,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { } types.push(*column.datatype()); - let mut coroutine = build_read(self.input, transaction); + let mut coroutine = build_read(self.input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let mut tuple: Tuple = throw!(tuple); @@ -65,13 +70,14 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { for tuple in tuples { throw!(transaction.append(table_name, tuple, &types, true)); } - let col_id = throw!(transaction.add_column(table_name, column, *if_not_exists)); + let col_id = + throw!(transaction.add_column(cache.0, table_name, column, *if_not_exists)); // Unique Index if let (Some(unique_values), Some(unique_meta)) = ( unique_values, transaction - .table(table_name.clone()) + .table(cache.0, table_name.clone()) .and_then(|table| table.get_unique_index(&col_id)) .cloned(), ) { diff --git a/src/execution/ddl/create_index.rs b/src/execution/ddl/create_index.rs index b2128dc0..94c6dd13 100644 --- a/src/execution/ddl/create_index.rs +++ b/src/execution/ddl/create_index.rs @@ -4,7 +4,7 @@ use crate::execution::{build_read, Executor, WriteExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::create_index::CreateIndexOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::index::Index; use crate::types::tuple::Tuple; @@ -26,7 +26,11 @@ impl From<(CreateIndexOperator, LogicalPlan)> for CreateIndex { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { - fn execute_mut(mut self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + mut self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -47,20 +51,25 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { }) .unzip(); let schema = self.input.output_schema().clone(); - let index_id = - match transaction.add_index_meta(&table_name, index_name, column_ids, ty) { - Ok(index_id) => index_id, - Err(DatabaseError::DuplicateIndex(index_name)) => { - if if_not_exists { - return; - } else { - throw!(Err(DatabaseError::DuplicateIndex(index_name))) - } + let index_id = match transaction.add_index_meta( + cache.0, + &table_name, + index_name, + column_ids, + ty, + ) { + Ok(index_id) => index_id, + Err(DatabaseError::DuplicateIndex(index_name)) => { + if if_not_exists { + return; + } else { + throw!(Err(DatabaseError::DuplicateIndex(index_name))) } - err => throw!(err), - }; + } + err => throw!(err), + }; let mut index_values = Vec::new(); - let mut coroutine = build_read(self.input, transaction); + let mut coroutine = build_read(self.input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let mut tuple: Tuple = throw!(tuple); diff --git a/src/execution/ddl/create_table.rs b/src/execution/ddl/create_table.rs index c7e987a0..1ad9555b 100644 --- a/src/execution/ddl/create_table.rs +++ b/src/execution/ddl/create_table.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::create_table::CreateTableOperator; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; @@ -15,7 +15,11 @@ impl From for CreateTable { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -25,8 +29,12 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable { if_not_exists, } = self.op; - let _ = - throw!(transaction.create_table(table_name.clone(), columns, if_not_exists)); + let _ = throw!(transaction.create_table( + table_cache, + table_name.clone(), + columns, + if_not_exists + )); yield Ok(TupleBuilder::build_result(format!("{}", table_name))); }, diff --git a/src/execution/ddl/drop_column.rs b/src/execution/ddl/drop_column.rs index cb8a0e88..c6e47ae7 100644 --- a/src/execution/ddl/drop_column.rs +++ b/src/execution/ddl/drop_column.rs @@ -2,7 +2,7 @@ use crate::errors::DatabaseError; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::operator::alter_table::drop_column::DropColumnOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; @@ -22,7 +22,11 @@ impl From<(DropColumnOperator, LogicalPlan)> for DropColumn { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { - fn execute_mut(mut self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + mut self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -53,7 +57,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { } types.push(*column_ref.datatype()); } - let mut coroutine = build_read(self.input, transaction); + let mut coroutine = build_read(self.input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let mut tuple: Tuple = throw!(tuple); @@ -65,7 +69,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { for tuple in tuples { throw!(transaction.append(&table_name, tuple, &types, true)); } - throw!(transaction.drop_column(&table_name, &column_name)); + throw!(transaction.drop_column(cache.0, &table_name, &column_name)); yield Ok(TupleBuilder::build_result("1".to_string())); } else if if_exists { diff --git a/src/execution/ddl/drop_table.rs b/src/execution/ddl/drop_table.rs index b38bb820..3e949bf2 100644 --- a/src/execution/ddl/drop_table.rs +++ b/src/execution/ddl/drop_table.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::drop_table::DropTableOperator; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; @@ -15,7 +15,11 @@ impl From for DropTable { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -24,7 +28,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable { if_exists, } = self.op; - throw!(transaction.drop_table(&table_name, if_exists)); + throw!(transaction.drop_table(table_cache, table_name.clone(), if_exists)); yield Ok(TupleBuilder::build_result(format!("{}", table_name))); }, diff --git a/src/execution/ddl/truncate.rs b/src/execution/ddl/truncate.rs index c0160d7e..0d57ac1e 100644 --- a/src/execution/ddl/truncate.rs +++ b/src/execution/ddl/truncate.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::truncate::TruncateOperator; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; @@ -15,7 +15,11 @@ impl From for Truncate { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Truncate { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + _: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 9d5d31ff..c8a4e2b6 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -6,7 +6,7 @@ use crate::optimizer::core::histogram::HistogramBuilder; use crate::optimizer::core::statistics_meta::StatisticsMeta; use crate::planner::operator::analyze::AnalyzeOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::index::IndexMetaRef; use crate::types::tuple::Tuple; @@ -49,7 +49,11 @@ impl From<(AnalyzeOperator, LogicalPlan)> for Analyze { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -62,7 +66,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { let schema = input.output_schema().clone(); let mut builders = Vec::with_capacity(index_metas.len()); let table = throw!(transaction - .table(table_name.clone()) + .table(cache.0, table_name.clone()) .cloned() .ok_or(DatabaseError::TableNotFound)); @@ -74,7 +78,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { )); } - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple = throw!(tuple); @@ -113,7 +117,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, })); - throw!(transaction.save_table_meta(&table_name, path, meta)); + throw!(transaction.save_table_meta(cache.1, &table_name, path, meta)); } yield Ok(Tuple { id: None, values }); }, diff --git a/src/execution/dml/copy_from_file.rs b/src/execution/dml/copy_from_file.rs index ef3ffce6..2b642663 100644 --- a/src/execution/dml/copy_from_file.rs +++ b/src/execution/dml/copy_from_file.rs @@ -2,7 +2,7 @@ use crate::binder::copy::FileFormat; use crate::errors::DatabaseError; use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::copy_from_file::CopyFromFileOperator; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::{types, Tuple}; use crate::types::tuple_builder::TupleBuilder; @@ -24,7 +24,11 @@ impl From for CopyFromFile { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + _: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -185,7 +189,8 @@ mod tests { let storage = db.storage; let mut transaction = storage.transaction()?; - let mut coroutine = executor.execute_mut(&mut transaction); + let mut coroutine = + executor.execute_mut((&db.table_cache, &db.meta_cache), &mut transaction); let tuple = match Pin::new(&mut coroutine).resume(()) { CoroutineState::Yielded(tuple) => tuple, CoroutineState::Complete(()) => unreachable!(), diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index 346e582f..3dfd6458 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -5,7 +5,7 @@ use crate::execution::{build_read, Executor, WriteExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::delete::DeleteOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::index::{Index, IndexId, IndexType}; use crate::types::tuple::Tuple; @@ -28,7 +28,11 @@ impl From<(DeleteOperator, LogicalPlan)> for Delete { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -39,13 +43,13 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { let schema = input.output_schema().clone(); let table = throw!(transaction - .table(table_name.clone()) + .table(cache.0, table_name.clone()) .cloned() .ok_or(DatabaseError::TableNotFound)); let mut tuple_ids = Vec::new(); let mut indexes: HashMap = HashMap::new(); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple: Tuple = throw!(tuple); diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 5b279d37..8e7507da 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -4,7 +4,7 @@ use crate::execution::dql::projection::Projection; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::operator::insert::InsertOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::index::Index; use crate::types::tuple::Tuple; @@ -41,7 +41,11 @@ impl From<(InsertOperator, LogicalPlan)> for Insert { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -60,9 +64,10 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { .map(|col| col.id()) .ok_or_else(|| DatabaseError::NotNull)); - if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() { + if let Some(table_catalog) = transaction.table(cache.0, table_name.clone()).cloned() + { let types = table_catalog.types(); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let Tuple { values, .. } = throw!(tuple); diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index 084983c4..caaca0f5 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -3,7 +3,7 @@ use crate::execution::dql::projection::Projection; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::operator::update::UpdateOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::index::Index; use crate::types::tuple::types; @@ -33,7 +33,11 @@ impl From<(UpdateOperator, LogicalPlan, LogicalPlan)> for Update { } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a> { + fn execute_mut( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -47,12 +51,13 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { let input_schema = input.output_schema().clone(); let types = types(&input_schema); - if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() { + if let Some(table_catalog) = transaction.table(cache.0, table_name.clone()).cloned() + { let mut value_map = HashMap::new(); let mut tuples = Vec::new(); // only once - let mut coroutine = build_read(values, transaction); + let mut coroutine = build_read(values, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let Tuple { values, .. } = throw!(tuple); @@ -61,7 +66,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { } } drop(coroutine); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple: Tuple = throw!(tuple); diff --git a/src/execution/dql/aggregate/hash_agg.rs b/src/execution/dql/aggregate/hash_agg.rs index d191d979..d31572f5 100644 --- a/src/execution/dql/aggregate/hash_agg.rs +++ b/src/execution/dql/aggregate/hash_agg.rs @@ -5,7 +5,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::{SchemaRef, Tuple}; use crate::types::value::ValueRef; @@ -40,10 +40,14 @@ impl From<(AggregateOperator, LogicalPlan)> for HashAggExecutor { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] - || { + move || { let HashAggExecutor { agg_calls, groupby_exprs, @@ -53,7 +57,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor { let mut agg_status = HashAggStatus::new(input.output_schema().clone(), agg_calls, groupby_exprs); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(result) = Pin::new(&mut coroutine).resume(()) { throw!(agg_status.update(throw!(result))); @@ -171,12 +175,17 @@ mod test { use crate::types::tuple::create_table; use crate::types::value::DataValue; use crate::types::LogicalType; + use crate::utils::lru::ShardingLruCache; use itertools::Itertools; + use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; #[test] fn test_hash_agg() -> Result<(), DatabaseError> { + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path()).unwrap(); let transaction = storage.transaction()?; @@ -230,7 +239,10 @@ mod test { _output_schema_ref: None, }; - let tuples = try_collect(HashAggExecutor::from((operator, input)).execute(&transaction))?; + let tuples = try_collect( + HashAggExecutor::from((operator, input)) + .execute((&table_cache, &meta_cache), &transaction), + )?; println!( "hash_agg_test: \n{}", diff --git a/src/execution/dql/aggregate/simple_agg.rs b/src/execution/dql/aggregate/simple_agg.rs index ca7c4f9f..e3d159a0 100644 --- a/src/execution/dql/aggregate/simple_agg.rs +++ b/src/execution/dql/aggregate/simple_agg.rs @@ -3,7 +3,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; @@ -26,7 +26,11 @@ impl From<(AggregateOperator, LogicalPlan)> for SimpleAggExecutor { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SimpleAggExecutor { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -38,7 +42,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SimpleAggExecutor { let mut accs = throw!(create_accumulators(&agg_calls)); let schema = input.output_schema().clone(); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple = throw!(tuple); diff --git a/src/execution/dql/describe.rs b/src/execution/dql/describe.rs index e15fc690..0518d830 100644 --- a/src/execution/dql/describe.rs +++ b/src/execution/dql/describe.rs @@ -2,7 +2,7 @@ use crate::catalog::{ColumnCatalog, TableName}; use crate::execution::DatabaseError; use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::describe::DescribeOperator; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type, ValueRef}; @@ -41,12 +41,16 @@ impl From for Describe { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Describe { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { let table = throw!(transaction - .table(self.table_name.clone()) + .table(cache.0, self.table_name.clone()) .ok_or(DatabaseError::TableNotFound)); let key_fn = |column: &ColumnCatalog| { if column.desc.is_primary { diff --git a/src/execution/dql/dummy.rs b/src/execution/dql/dummy.rs index d3b67740..57e267c0 100644 --- a/src/execution/dql/dummy.rs +++ b/src/execution/dql/dummy.rs @@ -1,11 +1,11 @@ use crate::execution::{Executor, ReadExecutor}; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::types::tuple::Tuple; pub struct Dummy {} impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Dummy { - fn execute(self, _: &T) -> Executor<'a> { + fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dql/explain.rs b/src/execution/dql/explain.rs index b3a025c6..f1badc7b 100644 --- a/src/execution/dql/explain.rs +++ b/src/execution/dql/explain.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type}; use sqlparser::ast::CharLengthUnits; @@ -17,7 +17,7 @@ impl From for Explain { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Explain { - fn execute(self, _: &T) -> Executor<'a> { + fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dql/filter.rs b/src/execution/dql/filter.rs index 42cf349a..f72098c3 100644 --- a/src/execution/dql/filter.rs +++ b/src/execution/dql/filter.rs @@ -2,7 +2,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::filter::FilterOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use std::ops::Coroutine; use std::ops::CoroutineState; @@ -20,7 +20,11 @@ impl From<(FilterOperator, LogicalPlan)> for Filter { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -31,7 +35,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter { let schema = input.output_schema().clone(); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple = throw!(tuple); diff --git a/src/execution/dql/index_scan.rs b/src/execution/dql/index_scan.rs index 1f50cf0e..e8a80416 100644 --- a/src/execution/dql/index_scan.rs +++ b/src/execution/dql/index_scan.rs @@ -1,7 +1,7 @@ use crate::execution::{Executor, ReadExecutor}; use crate::expression::range_detacher::Range; use crate::planner::operator::scan::ScanOperator; -use crate::storage::{Iter, Transaction}; +use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::index::IndexMetaRef; @@ -27,7 +27,11 @@ impl From<(ScanOperator, IndexMetaRef, Range)> for IndexScan { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -39,7 +43,14 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { } = self.op; let mut iter = transaction - .read_by_index(table_name, limit, columns, self.index_by, self.ranges) + .read_by_index( + table_cache, + table_name, + limit, + columns, + self.index_by, + self.ranges, + ) .unwrap(); while let Some(tuple) = throw!(iter.next_tuple()) { diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index b91602c7..c19f014e 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -5,7 +5,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::{Schema, SchemaRef, Tuple}; use crate::types::value::{DataValue, ValueRef, NULL_VALUE}; @@ -42,7 +42,11 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for HashJoin { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -63,7 +67,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { // build phase: // 1.construct hashtable, one hash key may contains multiple rows indices. // 2.merged all left tuples. - let mut coroutine = build_read(left_input, transaction); + let mut coroutine = build_read(left_input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple: Tuple = throw!(tuple); @@ -72,7 +76,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { } // probe phase - let mut coroutine = build_read(right_input, transaction); + let mut coroutine = build_read(right_input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple: Tuple = throw!(tuple); @@ -430,6 +434,8 @@ mod test { use crate::storage::Storage; use crate::types::value::DataValue; use crate::types::LogicalType; + use crate::utils::lru::ShardingLruCache; + use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; @@ -522,6 +528,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -531,7 +539,8 @@ mod test { }, join_type: JoinType::Inner, }; - let executor = HashJoin::from((op, left, right)).execute(&transaction); + let executor = + HashJoin::from((op, left, right)).execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 3); @@ -557,6 +566,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -569,7 +580,7 @@ mod test { //Outer { let executor = HashJoin::from((op.clone(), left.clone(), right.clone())); - let tuples = try_collect(executor.execute(&transaction))?; + let tuples = try_collect(executor.execute((&table_cache, &meta_cache), &transaction))?; assert_eq!(tuples.len(), 4); @@ -594,7 +605,8 @@ mod test { { let mut executor = HashJoin::from((op.clone(), left.clone(), right.clone())); executor.ty = JoinType::LeftSemi; - let mut tuples = try_collect(executor.execute(&transaction))?; + let mut tuples = + try_collect(executor.execute((&table_cache, &meta_cache), &transaction))?; assert_eq!(tuples.len(), 2); tuples.sort_by_key(|tuple| { @@ -616,7 +628,7 @@ mod test { { let mut executor = HashJoin::from((op, left, right)); executor.ty = JoinType::LeftAnti; - let tuples = try_collect(executor.execute(&transaction))?; + let tuples = try_collect(executor.execute((&table_cache, &meta_cache), &transaction))?; assert_eq!(tuples.len(), 1); assert_eq!( @@ -633,6 +645,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -642,7 +656,8 @@ mod test { }, join_type: JoinType::RightOuter, }; - let executor = HashJoin::from((op, left, right)).execute(&transaction); + let executor = + HashJoin::from((op, left, right)).execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 4); @@ -672,6 +687,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right) = build_join_values(); let op = JoinOperator { @@ -681,7 +698,8 @@ mod test { }, join_type: JoinType::Full, }; - let executor = HashJoin::from((op, left, right)).execute(&transaction); + let executor = + HashJoin::from((op, left, right)).execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 5); diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index 00da70c1..07096d43 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -10,7 +10,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::{Schema, SchemaRef, Tuple}; use crate::types::value::{DataValue, NULL_VALUE}; @@ -126,7 +126,11 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for NestedLoopJoin { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -144,7 +148,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { unreachable!("{} cannot be handled in nested loop join", self.ty) } let right_schema_len = eq_cond.right_schema.len(); - let mut left_coroutine = build_read(left_input, transaction); + let mut left_coroutine = build_read(left_input, cache, transaction); while let CoroutineState::Yielded(left_tuple) = Pin::new(&mut left_coroutine).resume(()) @@ -152,7 +156,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { let left_tuple: Tuple = throw!(left_tuple); let mut has_matched = false; - let mut right_coroutine = build_read(right_input.clone(), transaction); + let mut right_coroutine = build_read(right_input.clone(), cache, transaction); while let CoroutineState::Yielded(right_tuple) = Pin::new(&mut right_coroutine).resume(()) @@ -355,7 +359,9 @@ mod test { use crate::types::evaluator::BinaryEvaluatorBox; use crate::types::value::DataValue; use crate::types::LogicalType; + use crate::utils::lru::ShardingLruCache; use std::collections::HashSet; + use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; @@ -497,6 +503,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -505,7 +513,8 @@ mod test { }, join_type: JoinType::Inner, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -522,6 +531,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -530,7 +541,8 @@ mod test { }, join_type: JoinType::LeftOuter, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; assert_eq!( @@ -559,6 +571,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -567,7 +581,8 @@ mod test { }, join_type: JoinType::Cross, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -585,6 +600,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, _) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -593,7 +610,8 @@ mod test { }, join_type: JoinType::Cross, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(3); @@ -614,6 +632,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, _) = build_join_values(false); let op = JoinOperator { on: JoinCondition::On { @@ -622,7 +642,8 @@ mod test { }, join_type: JoinType::Cross, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 16); @@ -635,6 +656,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -643,7 +666,8 @@ mod test { }, join_type: JoinType::LeftSemi, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -659,6 +683,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -667,7 +693,8 @@ mod test { }, join_type: JoinType::LeftAnti, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(3); @@ -685,6 +712,8 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; + let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { on: JoinCondition::On { @@ -693,7 +722,8 @@ mod test { }, join_type: JoinType::RightOuter, }; - let executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let executor = NestedLoopJoin::from((op, left, right)) + .execute((&table_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(4); diff --git a/src/execution/dql/limit.rs b/src/execution/dql/limit.rs index d50ba1ff..ad4a94ce 100644 --- a/src/execution/dql/limit.rs +++ b/src/execution/dql/limit.rs @@ -1,7 +1,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::operator::limit::LimitOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; @@ -23,7 +23,11 @@ impl From<(LimitOperator, LogicalPlan)> for Limit { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Limit { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -41,7 +45,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Limit { let offset_limit = offset_val + limit.unwrap_or(1) - 1; let mut i = 0; - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { i += 1; diff --git a/src/execution/dql/projection.rs b/src/execution/dql/projection.rs index da87f0bc..e0ac6e85 100644 --- a/src/execution/dql/projection.rs +++ b/src/execution/dql/projection.rs @@ -4,7 +4,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::project::ProjectOperator; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; @@ -24,13 +24,17 @@ impl From<(ProjectOperator, LogicalPlan)> for Projection { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Projection { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { let Projection { exprs, mut input } = self; let schema = input.output_schema().clone(); - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let mut tuple = throw!(tuple); diff --git a/src/execution/dql/seq_scan.rs b/src/execution/dql/seq_scan.rs index 348387c8..17b60004 100644 --- a/src/execution/dql/seq_scan.rs +++ b/src/execution/dql/seq_scan.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::scan::ScanOperator; -use crate::storage::{Iter, Transaction}; +use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction}; use crate::throw; pub(crate) struct SeqScan { @@ -14,7 +14,11 @@ impl From for SeqScan { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -25,7 +29,9 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { .. } = self.op; - let mut iter = transaction.read(table_name, limit, columns).unwrap(); + let mut iter = transaction + .read(table_cache, table_name, limit, columns) + .unwrap(); while let Some(tuple) = throw!(iter.next_tuple()) { yield Ok(tuple); diff --git a/src/execution/dql/show_table.rs b/src/execution/dql/show_table.rs index 5f654426..d03af33d 100644 --- a/src/execution/dql/show_table.rs +++ b/src/execution/dql/show_table.rs @@ -1,6 +1,6 @@ use crate::catalog::TableMeta; use crate::execution::{Executor, ReadExecutor}; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type}; @@ -10,7 +10,11 @@ use std::sync::Arc; pub struct ShowTables; impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for ShowTables { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + _: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index 0e12749b..ef606758 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -2,7 +2,7 @@ use crate::errors::DatabaseError; use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::{Schema, Tuple}; use itertools::Itertools; @@ -87,7 +87,11 @@ impl From<(SortOperator, LogicalPlan)> for Sort { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Sort { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -100,7 +104,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Sort { let schema = input.output_schema().clone(); let mut tuples: Vec = vec![]; - let mut coroutine = build_read(input, transaction); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { tuples.push(throw!(tuple)); diff --git a/src/execution/dql/union.rs b/src/execution/dql/union.rs index 311e5523..b34e2e79 100644 --- a/src/execution/dql/union.rs +++ b/src/execution/dql/union.rs @@ -1,6 +1,6 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; @@ -20,7 +20,11 @@ impl From<(LogicalPlan, LogicalPlan)> for Union { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Union { - fn execute(self, transaction: &'a T) -> Executor<'a> { + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { @@ -28,12 +32,12 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Union { left_input, right_input, } = self; - let mut coroutine = build_read(left_input, transaction); + let mut coroutine = build_read(left_input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { yield tuple; } - let mut coroutine = build_read(right_input, transaction); + let mut coroutine = build_read(right_input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { yield tuple; diff --git a/src/execution/dql/values.rs b/src/execution/dql/values.rs index 0b72d568..22f58eef 100644 --- a/src/execution/dql/values.rs +++ b/src/execution/dql/values.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::values::ValuesOperator; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::types::tuple::Tuple; pub struct Values { @@ -14,7 +14,7 @@ impl From for Values { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Values { - fn execute(self, _: &T) -> Executor<'a> { + fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 20464993..b050dd21 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -34,7 +34,7 @@ use crate::execution::dql::values::Values; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::{Operator, PhysicalOption}; use crate::planner::LogicalPlan; -use crate::storage::Transaction; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::types::index::IndexInfo; use crate::types::tuple::Tuple; use std::ops::{Coroutine, CoroutineState}; @@ -44,14 +44,26 @@ pub type Executor<'a> = Box, Return = ()> + 'a + Unpin>; pub trait ReadExecutor<'a, T: Transaction + 'a> { - fn execute(self, transaction: &'a T) -> Executor<'a>; + fn execute( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a>; } pub trait WriteExecutor<'a, T: Transaction + 'a> { - fn execute_mut(self, transaction: &'a mut T) -> Executor<'a>; + fn execute_mut( + self, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a mut T, + ) -> Executor<'a>; } -pub fn build_read<'a, T: Transaction + 'a>(plan: LogicalPlan, transaction: &'a T) -> Executor<'a> { +pub fn build_read<'a, T: Transaction + 'a>( + plan: LogicalPlan, + cache: (&'a TableCache, &'a StatisticsMetaCache), + transaction: &'a T, +) -> Executor<'a> { let LogicalPlan { operator, mut childrens, @@ -59,20 +71,20 @@ pub fn build_read<'a, T: Transaction + 'a>(plan: LogicalPlan, transaction: &'a T } = plan; match operator { - Operator::Dummy => Dummy {}.execute(transaction), + Operator::Dummy => Dummy {}.execute(cache, transaction), Operator::Aggregate(op) => { let input = childrens.pop().unwrap(); if op.groupby_exprs.is_empty() { - SimpleAggExecutor::from((op, input)).execute(transaction) + SimpleAggExecutor::from((op, input)).execute(cache, transaction) } else { - HashAggExecutor::from((op, input)).execute(transaction) + HashAggExecutor::from((op, input)).execute(cache, transaction) } } Operator::Filter(op) => { let input = childrens.pop().unwrap(); - Filter::from((op, input)).execute(transaction) + Filter::from((op, input)).execute(cache, transaction) } Operator::Join(op) => { let right_input = childrens.pop().unwrap(); @@ -82,15 +94,17 @@ pub fn build_read<'a, T: Transaction + 'a>(plan: LogicalPlan, transaction: &'a T JoinCondition::On { on, .. } if !on.is_empty() && plan.physical_option == Some(PhysicalOption::HashJoin) => { - HashJoin::from((op, left_input, right_input)).execute(transaction) + HashJoin::from((op, left_input, right_input)).execute(cache, transaction) + } + _ => { + NestedLoopJoin::from((op, left_input, right_input)).execute(cache, transaction) } - _ => NestedLoopJoin::from((op, left_input, right_input)).execute(transaction), } } Operator::Project(op) => { let input = childrens.pop().unwrap(); - Projection::from((op, input)).execute(transaction) + Projection::from((op, input)).execute(cache, transaction) } Operator::Scan(op) => { if let Some(PhysicalOption::IndexScan(IndexInfo { @@ -98,34 +112,34 @@ pub fn build_read<'a, T: Transaction + 'a>(plan: LogicalPlan, transaction: &'a T range: Some(range), })) = plan.physical_option { - IndexScan::from((op, meta, range)).execute(transaction) + IndexScan::from((op, meta, range)).execute(cache, transaction) } else { - SeqScan::from(op).execute(transaction) + SeqScan::from(op).execute(cache, transaction) } } Operator::Sort(op) => { let input = childrens.pop().unwrap(); - Sort::from((op, input)).execute(transaction) + Sort::from((op, input)).execute(cache, transaction) } Operator::Limit(op) => { let input = childrens.pop().unwrap(); - Limit::from((op, input)).execute(transaction) + Limit::from((op, input)).execute(cache, transaction) } - Operator::Values(op) => Values::from(op).execute(transaction), - Operator::Show => ShowTables.execute(transaction), + Operator::Values(op) => Values::from(op).execute(cache, transaction), + Operator::Show => ShowTables.execute(cache, transaction), Operator::Explain => { let input = childrens.pop().unwrap(); - Explain::from(input).execute(transaction) + Explain::from(input).execute(cache, transaction) } - Operator::Describe(op) => Describe::from(op).execute(transaction), + Operator::Describe(op) => Describe::from(op).execute(cache, transaction), Operator::Union(_) => { let right_input = childrens.pop().unwrap(); let left_input = childrens.pop().unwrap(); - Union::from((left_input, right_input)).execute(transaction) + Union::from((left_input, right_input)).execute(cache, transaction) } _ => unreachable!(), } @@ -133,6 +147,7 @@ pub fn build_read<'a, T: Transaction + 'a>(plan: LogicalPlan, transaction: &'a T pub fn build_write<'a, T: Transaction + 'a>( plan: LogicalPlan, + cache: (&'a TableCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { let LogicalPlan { @@ -146,36 +161,36 @@ pub fn build_write<'a, T: Transaction + 'a>( Operator::Insert(op) => { let input = childrens.pop().unwrap(); - Insert::from((op, input)).execute_mut(transaction) + Insert::from((op, input)).execute_mut(cache, transaction) } Operator::Update(op) => { let values = childrens.pop().unwrap(); let input = childrens.pop().unwrap(); - Update::from((op, input, values)).execute_mut(transaction) + Update::from((op, input, values)).execute_mut(cache, transaction) } Operator::Delete(op) => { let input = childrens.pop().unwrap(); - Delete::from((op, input)).execute_mut(transaction) + Delete::from((op, input)).execute_mut(cache, transaction) } Operator::AddColumn(op) => { let input = childrens.pop().unwrap(); - AddColumn::from((op, input)).execute_mut(transaction) + AddColumn::from((op, input)).execute_mut(cache, transaction) } Operator::DropColumn(op) => { let input = childrens.pop().unwrap(); - DropColumn::from((op, input)).execute_mut(transaction) + DropColumn::from((op, input)).execute_mut(cache, transaction) } - Operator::CreateTable(op) => CreateTable::from(op).execute_mut(transaction), + Operator::CreateTable(op) => CreateTable::from(op).execute_mut(cache, transaction), Operator::CreateIndex(op) => { let input = childrens.pop().unwrap(); - CreateIndex::from((op, input)).execute_mut(transaction) + CreateIndex::from((op, input)).execute_mut(cache, transaction) } - Operator::DropTable(op) => DropTable::from(op).execute_mut(transaction), - Operator::Truncate(op) => Truncate::from(op).execute_mut(transaction), - Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(transaction), + Operator::DropTable(op) => DropTable::from(op).execute_mut(cache, transaction), + Operator::Truncate(op) => Truncate::from(op).execute_mut(cache, transaction), + Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(cache, transaction), #[warn(unused_assignments)] Operator::CopyToFile(_op) => { todo!() @@ -183,7 +198,7 @@ pub fn build_write<'a, T: Transaction + 'a>( Operator::Analyze(op) => { let input = childrens.pop().unwrap(); - Analyze::from((op, input)).execute_mut(transaction) + Analyze::from((op, input)).execute_mut(cache, transaction) } operator => build_read( LogicalPlan { @@ -192,6 +207,7 @@ pub fn build_write<'a, T: Transaction + 'a>( physical_option, _output_schema_ref, }, + cache, transaction, ), } diff --git a/src/optimizer/core/memo.rs b/src/optimizer/core/memo.rs index 1d18bc4a..c22eb5f6 100644 --- a/src/optimizer/core/memo.rs +++ b/src/optimizer/core/memo.rs @@ -117,7 +117,12 @@ mod tests { let transaction = database.storage.transaction()?; let functions = Default::default(); let mut binder = Binder::new( - BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))), + BinderContext::new( + &database.table_cache, + &transaction, + &functions, + Arc::new(AtomicUsize::new(0)), + ), None, ); // where: c1 => 2, (40, +inf) @@ -149,7 +154,11 @@ mod tests { ImplementationRuleImpl::IndexScan, ]; - let memo = Memo::new(&graph, &transaction.meta_loader(), &rules)?; + let memo = Memo::new( + &graph, + &transaction.meta_loader(&database.meta_cache), + &rules, + )?; let best_plan = graph.into_plan(Some(&memo)); let exprs = &memo.groups.get(&NodeIndex::new(3)).unwrap(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8394f25e..e75e0abc 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::{mem, slice}; pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), StatisticsMeta>; +pub(crate) type TableCache = ShardingLruCache; pub trait Storage: Clone { type TransactionType<'a>: Transaction @@ -41,6 +42,7 @@ pub trait Transaction: Sized { /// The projections is column indices. fn read( &self, + table_cache: &TableCache, table_name: TableName, bounds: Bounds, mut columns: Vec<(usize, ColumnRef)>, @@ -49,7 +51,7 @@ pub trait Transaction: Sized { assert!(columns.iter().map(|(i, _)| i).all_unique()); let table = self - .table(table_name.clone()) + .table(table_cache, table_name.clone()) .ok_or(DatabaseError::TableNotFound)?; let table_types = table.types(); if columns.is_empty() { @@ -76,19 +78,20 @@ pub trait Transaction: Sized { }) } - fn read_by_index( - &self, + fn read_by_index<'a>( + &'a self, + table_cache: &'a TableCache, table_name: TableName, (offset_option, limit_option): Bounds, columns: Vec<(usize, ColumnRef)>, index_meta: IndexMetaRef, ranges: Vec, - ) -> Result, DatabaseError> { + ) -> Result, DatabaseError> { assert!(columns.is_sorted_by_key(|(i, _)| i)); assert!(columns.iter().map(|(i, _)| i).all_unique()); let table = self - .table(table_name.clone()) + .table(table_cache, table_name.clone()) .ok_or(DatabaseError::TableNotFound)?; let table_types = table.types(); let table_name = table.name.as_str(); @@ -121,16 +124,17 @@ pub trait Transaction: Sized { fn add_index_meta( &mut self, + table_cache: &TableCache, table_name: &TableName, index_name: String, column_ids: Vec, ty: IndexType, ) -> Result { - if let Some(mut table) = self.table(table_name.clone()).cloned() { + if let Some(mut table) = self.table(table_cache, table_name.clone()).cloned() { let index_meta = table.add_index_meta(index_name, column_ids, ty)?; let (key, value) = TableCodec::encode_index_meta(table_name, index_meta)?; self.set(key, value)?; - self.table_cache().remove(table_name); + table_cache.remove(table_name); Ok(index_meta.id) } else { @@ -203,11 +207,12 @@ pub trait Transaction: Sized { fn add_column( &mut self, + table_cache: &TableCache, table_name: &TableName, column: &ColumnCatalog, if_not_exists: bool, ) -> Result { - if let Some(mut table) = self.table(table_name.clone()).cloned() { + if let Some(mut table) = self.table(table_cache, table_name.clone()).cloned() { if !column.nullable && column.default_value()?.is_none() { return Err(DatabaseError::NeedNullAbleOrDefault); } @@ -236,7 +241,7 @@ pub trait Transaction: Sized { let column = table.get_column_by_id(&col_id).unwrap(); let (key, value) = TableCodec::encode_column(table_name, column)?; self.set(key, value)?; - self.table_cache().remove(table_name); + table_cache.remove(table_name); Ok(col_id) } else { @@ -246,10 +251,11 @@ pub trait Transaction: Sized { fn drop_column( &mut self, + table_cache: &TableCache, table_name: &TableName, column_name: &str, ) -> Result<(), DatabaseError> { - if let Some(table_catalog) = self.table(table_name.clone()).cloned() { + if let Some(table_catalog) = self.table(table_cache, table_name.clone()).cloned() { let column = table_catalog.get_column_by_name(column_name).unwrap(); let (key, _) = TableCodec::encode_column(table_name, column)?; @@ -265,7 +271,7 @@ pub trait Transaction: Sized { let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id); self._drop_data(&index_min, &index_max)?; } - self.table_cache().remove(table_name); + table_cache.remove(table_name); Ok(()) } else { @@ -275,6 +281,7 @@ pub trait Transaction: Sized { fn create_table( &mut self, + table_cache: &TableCache, table_name: TableName, columns: Vec, if_not_exists: bool, @@ -299,30 +306,34 @@ pub trait Transaction: Sized { let (key, value) = TableCodec::encode_column(&table_name, column)?; self.set(key, value)?; } - self.table_cache() - .put(table_name.to_string(), table_catalog); + table_cache.put(table_name.to_string(), table_catalog); Ok(table_name) } - fn drop_table(&mut self, table_name: &str, if_exists: bool) -> Result<(), DatabaseError> { - if self.table(Arc::new(table_name.to_string())).is_none() { + fn drop_table( + &mut self, + table_cache: &TableCache, + table_name: TableName, + if_exists: bool, + ) -> Result<(), DatabaseError> { + if self.table(table_cache, table_name.clone()).is_none() { if if_exists { return Ok(()); } else { return Err(DatabaseError::TableNotFound); } } - self.drop_data(table_name)?; + self.drop_data(table_name.as_str())?; - let (column_min, column_max) = TableCodec::columns_bound(table_name); + let (column_min, column_max) = TableCodec::columns_bound(table_name.as_str()); self._drop_data(&column_min, &column_max)?; - let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name); + let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name.as_str()); self._drop_data(&index_meta_min, &index_meta_max)?; - self.remove(&TableCodec::encode_root_table_key(table_name))?; - self.table_cache().remove(&table_name.to_string()); + self.remove(&TableCodec::encode_root_table_key(table_name.as_str()))?; + table_cache.remove(&table_name); Ok(()) } @@ -337,8 +348,12 @@ pub trait Transaction: Sized { Ok(()) } - fn table(&self, table_name: TableName) -> Option<&TableCatalog> { - self.table_cache() + fn table<'a>( + &'a self, + table_cache: &'a TableCache, + table_name: TableName, + ) -> Option<&TableCatalog> { + table_cache .get_or_insert(table_name.to_string(), |_| { // TODO: unify the data into a `Meta` prefix and use one iteration to collect all data let (columns, indexes) = self.table_collect(table_name.clone())?; @@ -364,15 +379,14 @@ pub trait Transaction: Sized { fn save_table_meta( &mut self, + meta_cache: &StatisticsMetaCache, table_name: &TableName, path: String, statistics_meta: StatisticsMeta, ) -> Result<(), DatabaseError> { // TODO: clean old meta file let index_id = statistics_meta.index_id(); - let _ = self - .meta_cache() - .put((table_name.clone(), index_id), statistics_meta); + meta_cache.put((table_name.clone(), index_id), statistics_meta); let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path); self.set(key, value)?; @@ -391,11 +405,11 @@ pub trait Transaction: Sized { .transpose() } - fn meta_loader(&self) -> StatisticMetaLoader + fn meta_loader<'a>(&'a self, meta_cache: &'a StatisticsMetaCache) -> StatisticMetaLoader where Self: Sized, { - StatisticMetaLoader::new(self, self.meta_cache()) + StatisticMetaLoader::new(self, meta_cache) } fn table_collect( @@ -483,9 +497,6 @@ pub trait Transaction: Sized { max: Bound<&[u8]>, ) -> Result, DatabaseError>; - fn table_cache(&self) -> &ShardingLruCache; - fn meta_cache(&self) -> &StatisticsMetaCache; - fn commit(self) -> Result<(), DatabaseError>; } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index d6033c3d..d1c7a7b4 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -1,19 +1,14 @@ -use crate::catalog::TableCatalog; use crate::errors::DatabaseError; -use crate::storage::{InnerIter, StatisticsMetaCache, Storage, Transaction}; -use crate::utils::lru::ShardingLruCache; +use crate::storage::{InnerIter, Storage, Transaction}; use bytes::Bytes; use rocksdb::{DBIteratorWithThreadMode, Direction, IteratorMode, OptimisticTransactionDB}; use std::collections::Bound; -use std::hash::RandomState; use std::path::PathBuf; use std::sync::Arc; #[derive(Clone)] pub struct RocksStorage { pub inner: Arc, - pub(crate) meta_cache: Arc, - pub(crate) table_cache: Arc>, } impl RocksStorage { @@ -26,13 +21,9 @@ impl RocksStorage { opts.create_if_missing(true); let storage = OptimisticTransactionDB::open(&opts, path.into())?; - let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); - let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); Ok(RocksStorage { inner: Arc::new(storage), - meta_cache, - table_cache, }) } } @@ -45,16 +36,12 @@ impl Storage for RocksStorage { fn transaction(&self) -> Result, DatabaseError> { Ok(RocksTransaction { tx: self.inner.transaction(), - meta_cache: self.meta_cache.clone(), - table_cache: self.table_cache.clone(), }) } } pub struct RocksTransaction<'db> { tx: rocksdb::Transaction<'db, OptimisticTransactionDB>, - pub(crate) meta_cache: Arc, - pub(crate) table_cache: Arc>, } impl<'txn> Transaction for RocksTransaction<'txn> { @@ -103,14 +90,6 @@ impl<'txn> Transaction for RocksTransaction<'txn> { }) } - fn table_cache(&self) -> &ShardingLruCache { - self.table_cache.as_ref() - } - - fn meta_cache(&self) -> &StatisticsMetaCache { - self.meta_cache.as_ref() - } - fn commit(self) -> Result<(), DatabaseError> { self.tx.commit()?; Ok(()) @@ -127,18 +106,20 @@ impl InnerIter for RocksIter<'_, '_> { fn try_next(&mut self) -> Result, DatabaseError> { for result in self.iter.by_ref() { let (key, value) = result?; - let lower_bound_check = match &self.lower { - Bound::Included(ref lower) => key.as_ref() >= lower.as_slice(), - Bound::Excluded(ref lower) => key.as_ref() > lower.as_slice(), - Bound::Unbounded => true, - }; let upper_bound_check = match &self.upper { Bound::Included(ref upper) => key.as_ref() <= upper.as_slice(), Bound::Excluded(ref upper) => key.as_ref() < upper.as_slice(), Bound::Unbounded => true, }; - - if lower_bound_check && upper_bound_check { + if !upper_bound_check { + break; + } + let lower_bound_check = match &self.lower { + Bound::Included(ref lower) => key.as_ref() >= lower.as_slice(), + Bound::Excluded(ref lower) => key.as_ref() > lower.as_slice(), + Bound::Unbounded => true, + }; + if lower_bound_check { return Ok(Some((Bytes::from(key), Bytes::from(value)))); } } @@ -160,16 +141,19 @@ mod test { use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; + use crate::utils::lru::ShardingLruCache; use itertools::Itertools; use std::collections::{Bound, VecDeque}; + use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; #[test] - fn test_in_kipdb_storage_works_with_data() -> Result<(), DatabaseError> { + fn test_in_rocksdb_storage_works_with_data() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; let mut transaction = storage.transaction()?; + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?); let columns = Arc::new(vec![ Arc::new(ColumnCatalog::new( "c1".to_string(), @@ -187,9 +171,14 @@ mod test { .iter() .map(|col_ref| ColumnCatalog::clone(&col_ref)) .collect_vec(); - let _ = transaction.create_table(Arc::new("test".to_string()), source_columns, false)?; + let _ = transaction.create_table( + &table_cache, + Arc::new("test".to_string()), + source_columns, + false, + )?; - let table_catalog = transaction.table(Arc::new("test".to_string())); + let table_catalog = transaction.table(&table_cache, Arc::new("test".to_string())); assert!(table_catalog.is_some()); assert!(table_catalog .unwrap() @@ -222,6 +211,7 @@ mod test { )?; let mut iter = transaction.read( + &table_cache, Arc::new("test".to_string()), (Some(1), Some(1)), vec![(0, columns[0].clone())], @@ -249,7 +239,10 @@ mod test { let transaction = fnck_sql.storage.transaction()?; let table_name = Arc::new("t1".to_string()); - let table = transaction.table(table_name.clone()).unwrap().clone(); + let table = transaction + .table(&fnck_sql.table_cache, table_name.clone()) + .unwrap() + .clone(); let tuple_ids = vec![ Arc::new(DataValue::Int32(Some(0))), Arc::new(DataValue::Int32(Some(2))), @@ -304,12 +297,13 @@ mod test { let transaction = fnck_sql.storage.transaction().unwrap(); let table = transaction - .table(Arc::new("t1".to_string())) + .table(&fnck_sql.table_cache, Arc::new("t1".to_string())) .unwrap() .clone(); let columns = table.columns().cloned().enumerate().collect_vec(); let mut iter = transaction .read_by_index( + &fnck_sql.table_cache, Arc::new("t1".to_string()), (Some(0), Some(1)), columns, diff --git a/src/utils/bit_vector.rs b/src/utils/bit_vector.rs index 5c93c77e..c0c0be17 100644 --- a/src/utils/bit_vector.rs +++ b/src/utils/bit_vector.rs @@ -4,6 +4,7 @@ use std::slice; #[derive(Debug, Default)] pub struct BitVector { + #[allow(dead_code)] len: u64, bit_groups: Vec, } @@ -31,14 +32,17 @@ impl BitVector { self.bit_groups[index / 8] >> (index % 8) & 1 != 0 } + #[allow(dead_code)] pub fn len(&self) -> usize { self.len as usize } + #[allow(dead_code)] pub fn is_empty(&self) -> bool { self.len == 0 } + #[allow(dead_code)] pub fn to_raw(&self, bytes: &mut Vec) { bytes.append(&mut u64::encode_fixed_vec(self.len)); @@ -47,6 +51,7 @@ impl BitVector { } } + #[allow(dead_code)] pub fn from_raw(bytes: &[u8]) -> Self { let len = u64::decode_fixed(&bytes[0..8]); let bit_groups = bytes[8..] diff --git a/src/utils/lru.rs b/src/utils/lru.rs index 44d20f85..07566566 100644 --- a/src/utils/lru.rs +++ b/src/utils/lru.rs @@ -283,6 +283,7 @@ impl LruCache { } } + #[allow(dead_code)] #[inline] pub fn get(&mut self, key: &K) -> Option<&V> { if let Some(node) = self.inner.get(key) { @@ -333,6 +334,7 @@ impl LruCache { } } + #[allow(dead_code)] #[inline] pub fn get_or_insert(&mut self, key: K, fn_once: F) -> Result<&V, DatabaseError> where @@ -342,6 +344,7 @@ impl LruCache { .map(|node| unsafe { &node.as_ref().value }) } + #[allow(dead_code)] #[inline] pub fn len(&self) -> usize { self.inner.len() @@ -352,6 +355,7 @@ impl LruCache { self.inner.is_empty() } + #[allow(dead_code)] #[inline] pub fn iter(&self) -> LruCacheIter { LruCacheIter {