Skip to content

Commit

Permalink
perf: strip table_cache and meta_cache out of storage & optimize …
Browse files Browse the repository at this point in the history
…`RocksIter` (#214)
  • Loading branch information
KKould authored Aug 18, 2024
1 parent c8e9dd1 commit 811d182
Show file tree
Hide file tree
Showing 40 changed files with 509 additions and 246 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.2"
version = "0.0.3"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/query_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::path::Path;

const QUERY_BENCH_FNCK_SQL_PATH: &'static str = "./fncksql_bench";
const QUERY_BENCH_SQLITE_PATH: &'static str = "./sqlite_bench";
const TABLE_ROW_NUM: u64 = 2_00_000;
const TABLE_ROW_NUM: u64 = 200_000;

fn query_cases() -> Vec<(&'static str, &'static str)> {
vec![
Expand Down
10 changes: 9 additions & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ mod tests {
use crate::storage::rocksdb::RocksStorage;
use crate::storage::Storage;
use crate::types::LogicalType;
use crate::utils::lru::ShardingLruCache;
use sqlparser::ast::CharLengthUnits;
use std::hash::RandomState;
use std::sync::atomic::AtomicUsize;
use tempfile::TempDir;

Expand All @@ -155,11 +157,17 @@ mod tests {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let functions = Default::default();

let sql = "create table t1 (id int primary key, name varchar(10) null)";
let mut binder = Binder::new(
BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))),
BinderContext::new(
&table_cache,
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
let stmt = crate::parser::parse_sql(sql).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,14 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
subquery: &Query,
) -> Result<(LogicalPlan, Arc<ColumnCatalog>), DatabaseError> {
let BinderContext {
table_cache,
transaction,
functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(
BinderContext::new(*transaction, functions, temp_table_id.clone()),
BinderContext::new(table_cache, *transaction, functions, temp_table_id.clone()),
Some(self),
);
let mut sub_query = binder.bind_query(subquery)?;
Expand Down
30 changes: 22 additions & 8 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::planner::operator::join::JoinType;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::storage::{TableCache, Transaction};

pub enum InputRefType {
AggCall,
Expand Down Expand Up @@ -83,6 +83,7 @@ pub enum SubQueryType {
#[derive(Clone)]
pub struct BinderContext<'a, T: Transaction> {
pub(crate) functions: &'a Functions,
pub(crate) table_cache: &'a TableCache,
pub(crate) transaction: &'a T,
// Tips: When there are multiple tables and Wildcard, use BTreeMap to ensure that the order of the output tables is certain.
pub(crate) bind_table:
Expand All @@ -105,12 +106,14 @@ pub struct BinderContext<'a, T: Transaction> {

impl<'a, T: Transaction> BinderContext<'a, T> {
pub fn new(
table_cache: &'a TableCache,
transaction: &'a T,
functions: &'a Functions,
temp_table_id: Arc<AtomicUsize>,
) -> Self {
BinderContext {
functions,
table_cache,
transaction,
bind_table: Default::default(),
expr_aliases: Default::default(),
Expand Down Expand Up @@ -157,9 +160,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> {

pub fn table(&self, table_name: TableName) -> Option<&TableCatalog> {
if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) {
self.transaction.table(real_name.clone())
self.transaction.table(self.table_cache, real_name.clone())
} else {
self.transaction.table(table_name)
self.transaction.table(self.table_cache, table_name)
}
}

Expand All @@ -170,9 +173,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
join_type: Option<JoinType>,
) -> Result<&TableCatalog, DatabaseError> {
let table = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) {
self.transaction.table(real_name.clone())
self.transaction.table(self.table_cache, real_name.clone())
} else {
self.transaction.table(table_name.clone())
self.transaction.table(self.table_cache, table_name.clone())
}
.ok_or(DatabaseError::TableNotFound)?;

Expand Down Expand Up @@ -380,20 +383,24 @@ pub mod test {
use crate::errors::DatabaseError;
use crate::planner::LogicalPlan;
use crate::storage::rocksdb::RocksStorage;
use crate::storage::{Storage, Transaction};
use crate::storage::{Storage, TableCache, Transaction};
use crate::types::LogicalType::Integer;
use crate::utils::lru::ShardingLruCache;
use std::hash::RandomState;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tempfile::TempDir;

pub(crate) fn build_test_catalog(
table_cache: &TableCache,
path: impl Into<PathBuf> + Send,
) -> Result<RocksStorage, DatabaseError> {
let storage = RocksStorage::new(path)?;
let mut transaction = storage.transaction()?;

let _ = transaction.create_table(
table_cache,
Arc::new("t1".to_string()),
vec![
ColumnCatalog::new(
Expand All @@ -411,6 +418,7 @@ pub mod test {
)?;

let _ = transaction.create_table(
table_cache,
Arc::new("t2".to_string()),
vec![
ColumnCatalog::new(
Expand All @@ -434,11 +442,17 @@ pub mod test {

pub fn select_sql_run<S: AsRef<str>>(sql: S) -> Result<LogicalPlan, DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = build_test_catalog(temp_dir.path())?;
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
let transaction = storage.transaction()?;
let functions = Default::default();
let mut binder = Binder::new(
BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))),
BinderContext::new(
&table_cache,
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
let stmt = crate::parser::parse_sql(sql)?;
Expand Down
3 changes: 2 additions & 1 deletion src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,13 +494,14 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
_ => unimplemented!(),
};
let BinderContext {
table_cache,
transaction,
functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(
BinderContext::new(*transaction, functions, temp_table_id.clone()),
BinderContext::new(table_cache, *transaction, functions, temp_table_id.clone()),
Some(self),
);
let mut right = binder.bind_single_table_ref(relation, Some(join_type))?;
Expand Down
75 changes: 54 additions & 21 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::binder::{command_type, Binder, BinderContext, CommandType};
use crate::catalog::TableCatalog;
use crate::errors::DatabaseError;
use crate::execution::{build_write, try_collect};
use crate::expression::function::{FunctionSummary, ScalarFunctionImpl};
Expand All @@ -9,12 +10,14 @@ use crate::optimizer::rule::normalization::NormalizationRuleImpl;
use crate::parser::parse_sql;
use crate::planner::LogicalPlan;
use crate::storage::rocksdb::RocksStorage;
use crate::storage::{Storage, Transaction};
use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction};
use crate::types::tuple::{SchemaRef, Tuple};
use crate::udf::current_date::CurrentDate;
use crate::utils::lru::ShardingLruCache;
use ahash::HashMap;
use parking_lot::{ArcRwLockReadGuard, ArcRwLockWriteGuard, RawRwLock, RwLock};
use sqlparser::ast::Statement;
use std::hash::RandomState;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
Expand Down Expand Up @@ -51,19 +54,25 @@ impl DataBaseBuilder {
self = self.register_function(CurrentDate::new());

let storage = RocksStorage::new(self.path)?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);

Ok(Database {
storage,
functions: Arc::new(self.functions),
mdl: Arc::new(RwLock::new(())),
meta_cache,
table_cache,
})
}
}

pub struct Database<S: Storage> {
pub storage: S,
pub(crate) storage: S,
functions: Arc<Functions>,
mdl: Arc<RwLock<()>>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
pub(crate) table_cache: Arc<ShardingLruCache<String, TableCatalog>>,
}

impl<S: Storage> Database<S> {
Expand All @@ -80,18 +89,21 @@ impl<S: Storage> Database<S> {
} else {
MetaDataLock::Read(self.mdl.read_arc())
};
let transaction = self.storage.transaction()?;
let plan = Self::build_plan(stmt, &transaction, &self.functions)?;

Self::run_volcano(transaction, plan)
}
let mut transaction = self.storage.transaction()?;
let mut plan = Self::build_plan(
stmt,
&self.table_cache,
&self.meta_cache,
&transaction,
&self.functions,
)?;

pub(crate) fn run_volcano(
mut transaction: <S as Storage>::TransactionType<'_>,
mut plan: LogicalPlan,
) -> Result<(SchemaRef, Vec<Tuple>), DatabaseError> {
let schema = plan.output_schema().clone();
let iterator = build_write(plan, &mut transaction);
let iterator = build_write(
plan,
(&self.table_cache, &self.meta_cache),
&mut transaction,
);
let tuples = try_collect(iterator)?;

transaction.commit()?;
Expand All @@ -107,16 +119,25 @@ impl<S: Storage> Database<S> {
inner: transaction,
functions: self.functions.clone(),
_guard: guard,
meta_cache: self.meta_cache.clone(),
table_cache: self.table_cache.clone(),
})
}

pub(crate) fn build_plan(
stmt: &Statement,
table_cache: &TableCache,
meta_cache: &StatisticsMetaCache,
transaction: &<S as Storage>::TransactionType<'_>,
functions: &Functions,
) -> Result<LogicalPlan, DatabaseError> {
let mut binder = Binder::new(
BinderContext::new(transaction, functions, Arc::new(AtomicUsize::new(0))),
BinderContext::new(
table_cache,
transaction,
functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
/// Build a logical plan.
Expand All @@ -129,8 +150,8 @@ impl<S: Storage> Database<S> {
let source_plan = binder.bind(stmt)?;
// println!("source_plan plan: {:#?}", source_plan);

let best_plan =
Self::default_optimizer(source_plan).find_best(Some(&transaction.meta_loader()))?;
let best_plan = Self::default_optimizer(source_plan)
.find_best(Some(&transaction.meta_loader(meta_cache)))?;
// println!("best_plan plan: {:#?}", best_plan);

Ok(best_plan)
Expand Down Expand Up @@ -221,6 +242,8 @@ pub struct DBTransaction<'a, S: Storage + 'a> {
inner: S::TransactionType<'a>,
functions: Arc<Functions>,
_guard: ArcRwLockReadGuard<RawRwLock, ()>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
pub(crate) table_cache: Arc<ShardingLruCache<String, TableCatalog>>,
}

impl<S: Storage> DBTransaction<'_, S> {
Expand All @@ -235,10 +258,16 @@ impl<S: Storage> DBTransaction<'_, S> {
"`DDL` is not allowed to execute within a transaction".to_string(),
));
}
let mut plan = Database::<S>::build_plan(stmt, &self.inner, &self.functions)?;
let mut plan = Database::<S>::build_plan(
stmt,
&self.table_cache,
&self.meta_cache,
&self.inner,
&self.functions,
)?;

let schema = plan.output_schema().clone();
let executor = build_write(plan, &mut self.inner);
let executor = build_write(plan, (&self.table_cache, &self.meta_cache), &mut self.inner);

Ok((schema, try_collect(executor)?))
}
Expand All @@ -258,7 +287,7 @@ mod test {
use crate::expression::ScalarExpression;
use crate::expression::{BinaryOperator, UnaryOperator};
use crate::function;
use crate::storage::{Storage, Transaction};
use crate::storage::{Storage, TableCache, Transaction};
use crate::types::evaluator::EvaluatorFactory;
use crate::types::tuple::{create_table, Tuple};
use crate::types::value::{DataValue, ValueRef};
Expand All @@ -268,7 +297,10 @@ mod test {
use std::sync::Arc;
use tempfile::TempDir;

fn build_table(mut transaction: impl Transaction) -> Result<(), DatabaseError> {
fn build_table(
table_cache: &TableCache,
mut transaction: impl Transaction,
) -> Result<(), DatabaseError> {
let columns = vec![
ColumnCatalog::new(
"c1".to_string(),
Expand All @@ -281,7 +313,8 @@ mod test {
ColumnDesc::new(LogicalType::Boolean, false, false, None),
),
];
let _ = transaction.create_table(Arc::new("t1".to_string()), columns, false)?;
let _ =
transaction.create_table(table_cache, Arc::new("t1".to_string()), columns, false)?;
transaction.commit()?;

Ok(())
Expand All @@ -292,7 +325,7 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let database = DataBaseBuilder::path(temp_dir.path()).build()?;
let transaction = database.storage.transaction()?;
build_table(transaction)?;
build_table(&database.table_cache, transaction)?;

let batch = database.run("select * from t1")?;

Expand Down
Loading

0 comments on commit 811d182

Please sign in to comment.