Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: strip table_cache and meta_cache out of storage & optimize … #214

Merged
merged 1 commit into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.2"
version = "0.0.3"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/query_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::path::Path;

const QUERY_BENCH_FNCK_SQL_PATH: &'static str = "./fncksql_bench";
const QUERY_BENCH_SQLITE_PATH: &'static str = "./sqlite_bench";
const TABLE_ROW_NUM: u64 = 2_00_000;
const TABLE_ROW_NUM: u64 = 200_000;

fn query_cases() -> Vec<(&'static str, &'static str)> {
vec![
Expand Down
10 changes: 9 additions & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ mod tests {
use crate::storage::rocksdb::RocksStorage;
use crate::storage::Storage;
use crate::types::LogicalType;
use crate::utils::lru::ShardingLruCache;
use sqlparser::ast::CharLengthUnits;
use std::hash::RandomState;
use std::sync::atomic::AtomicUsize;
use tempfile::TempDir;

Expand All @@ -155,11 +157,17 @@ mod tests {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let functions = Default::default();

let sql = "create table t1 (id int primary key, name varchar(10) null)";
let mut binder = Binder::new(
BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))),
BinderContext::new(
&table_cache,
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
let stmt = crate::parser::parse_sql(sql).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,14 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
subquery: &Query,
) -> Result<(LogicalPlan, Arc<ColumnCatalog>), DatabaseError> {
let BinderContext {
table_cache,
transaction,
functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(
BinderContext::new(*transaction, functions, temp_table_id.clone()),
BinderContext::new(table_cache, *transaction, functions, temp_table_id.clone()),
Some(self),
);
let mut sub_query = binder.bind_query(subquery)?;
Expand Down
30 changes: 22 additions & 8 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::planner::operator::join::JoinType;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::storage::{TableCache, Transaction};

pub enum InputRefType {
AggCall,
Expand Down Expand Up @@ -83,6 +83,7 @@ pub enum SubQueryType {
#[derive(Clone)]
pub struct BinderContext<'a, T: Transaction> {
pub(crate) functions: &'a Functions,
pub(crate) table_cache: &'a TableCache,
pub(crate) transaction: &'a T,
// Tips: When there are multiple tables and Wildcard, use BTreeMap to ensure that the order of the output tables is certain.
pub(crate) bind_table:
Expand All @@ -105,12 +106,14 @@ pub struct BinderContext<'a, T: Transaction> {

impl<'a, T: Transaction> BinderContext<'a, T> {
pub fn new(
table_cache: &'a TableCache,
transaction: &'a T,
functions: &'a Functions,
temp_table_id: Arc<AtomicUsize>,
) -> Self {
BinderContext {
functions,
table_cache,
transaction,
bind_table: Default::default(),
expr_aliases: Default::default(),
Expand Down Expand Up @@ -157,9 +160,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> {

pub fn table(&self, table_name: TableName) -> Option<&TableCatalog> {
if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) {
self.transaction.table(real_name.clone())
self.transaction.table(self.table_cache, real_name.clone())
} else {
self.transaction.table(table_name)
self.transaction.table(self.table_cache, table_name)
}
}

Expand All @@ -170,9 +173,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
join_type: Option<JoinType>,
) -> Result<&TableCatalog, DatabaseError> {
let table = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) {
self.transaction.table(real_name.clone())
self.transaction.table(self.table_cache, real_name.clone())
} else {
self.transaction.table(table_name.clone())
self.transaction.table(self.table_cache, table_name.clone())
}
.ok_or(DatabaseError::TableNotFound)?;

Expand Down Expand Up @@ -380,20 +383,24 @@ pub mod test {
use crate::errors::DatabaseError;
use crate::planner::LogicalPlan;
use crate::storage::rocksdb::RocksStorage;
use crate::storage::{Storage, Transaction};
use crate::storage::{Storage, TableCache, Transaction};
use crate::types::LogicalType::Integer;
use crate::utils::lru::ShardingLruCache;
use std::hash::RandomState;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tempfile::TempDir;

pub(crate) fn build_test_catalog(
table_cache: &TableCache,
path: impl Into<PathBuf> + Send,
) -> Result<RocksStorage, DatabaseError> {
let storage = RocksStorage::new(path)?;
let mut transaction = storage.transaction()?;

let _ = transaction.create_table(
table_cache,
Arc::new("t1".to_string()),
vec![
ColumnCatalog::new(
Expand All @@ -411,6 +418,7 @@ pub mod test {
)?;

let _ = transaction.create_table(
table_cache,
Arc::new("t2".to_string()),
vec![
ColumnCatalog::new(
Expand All @@ -434,11 +442,17 @@ pub mod test {

pub fn select_sql_run<S: AsRef<str>>(sql: S) -> Result<LogicalPlan, DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = build_test_catalog(temp_dir.path())?;
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
let transaction = storage.transaction()?;
let functions = Default::default();
let mut binder = Binder::new(
BinderContext::new(&transaction, &functions, Arc::new(AtomicUsize::new(0))),
BinderContext::new(
&table_cache,
&transaction,
&functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
let stmt = crate::parser::parse_sql(sql)?;
Expand Down
3 changes: 2 additions & 1 deletion src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,13 +494,14 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
_ => unimplemented!(),
};
let BinderContext {
table_cache,
transaction,
functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(
BinderContext::new(*transaction, functions, temp_table_id.clone()),
BinderContext::new(table_cache, *transaction, functions, temp_table_id.clone()),
Some(self),
);
let mut right = binder.bind_single_table_ref(relation, Some(join_type))?;
Expand Down
75 changes: 54 additions & 21 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::binder::{command_type, Binder, BinderContext, CommandType};
use crate::catalog::TableCatalog;
use crate::errors::DatabaseError;
use crate::execution::{build_write, try_collect};
use crate::expression::function::{FunctionSummary, ScalarFunctionImpl};
Expand All @@ -9,12 +10,14 @@ use crate::optimizer::rule::normalization::NormalizationRuleImpl;
use crate::parser::parse_sql;
use crate::planner::LogicalPlan;
use crate::storage::rocksdb::RocksStorage;
use crate::storage::{Storage, Transaction};
use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction};
use crate::types::tuple::{SchemaRef, Tuple};
use crate::udf::current_date::CurrentDate;
use crate::utils::lru::ShardingLruCache;
use ahash::HashMap;
use parking_lot::{ArcRwLockReadGuard, ArcRwLockWriteGuard, RawRwLock, RwLock};
use sqlparser::ast::Statement;
use std::hash::RandomState;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
Expand Down Expand Up @@ -51,19 +54,25 @@ impl DataBaseBuilder {
self = self.register_function(CurrentDate::new());

let storage = RocksStorage::new(self.path)?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new())?);

Ok(Database {
storage,
functions: Arc::new(self.functions),
mdl: Arc::new(RwLock::new(())),
meta_cache,
table_cache,
})
}
}

pub struct Database<S: Storage> {
pub storage: S,
pub(crate) storage: S,
functions: Arc<Functions>,
mdl: Arc<RwLock<()>>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
pub(crate) table_cache: Arc<ShardingLruCache<String, TableCatalog>>,
}

impl<S: Storage> Database<S> {
Expand All @@ -80,18 +89,21 @@ impl<S: Storage> Database<S> {
} else {
MetaDataLock::Read(self.mdl.read_arc())
};
let transaction = self.storage.transaction()?;
let plan = Self::build_plan(stmt, &transaction, &self.functions)?;

Self::run_volcano(transaction, plan)
}
let mut transaction = self.storage.transaction()?;
let mut plan = Self::build_plan(
stmt,
&self.table_cache,
&self.meta_cache,
&transaction,
&self.functions,
)?;

pub(crate) fn run_volcano(
mut transaction: <S as Storage>::TransactionType<'_>,
mut plan: LogicalPlan,
) -> Result<(SchemaRef, Vec<Tuple>), DatabaseError> {
let schema = plan.output_schema().clone();
let iterator = build_write(plan, &mut transaction);
let iterator = build_write(
plan,
(&self.table_cache, &self.meta_cache),
&mut transaction,
);
let tuples = try_collect(iterator)?;

transaction.commit()?;
Expand All @@ -107,16 +119,25 @@ impl<S: Storage> Database<S> {
inner: transaction,
functions: self.functions.clone(),
_guard: guard,
meta_cache: self.meta_cache.clone(),
table_cache: self.table_cache.clone(),
})
}

pub(crate) fn build_plan(
stmt: &Statement,
table_cache: &TableCache,
meta_cache: &StatisticsMetaCache,
transaction: &<S as Storage>::TransactionType<'_>,
functions: &Functions,
) -> Result<LogicalPlan, DatabaseError> {
let mut binder = Binder::new(
BinderContext::new(transaction, functions, Arc::new(AtomicUsize::new(0))),
BinderContext::new(
table_cache,
transaction,
functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
/// Build a logical plan.
Expand All @@ -129,8 +150,8 @@ impl<S: Storage> Database<S> {
let source_plan = binder.bind(stmt)?;
// println!("source_plan plan: {:#?}", source_plan);

let best_plan =
Self::default_optimizer(source_plan).find_best(Some(&transaction.meta_loader()))?;
let best_plan = Self::default_optimizer(source_plan)
.find_best(Some(&transaction.meta_loader(meta_cache)))?;
// println!("best_plan plan: {:#?}", best_plan);

Ok(best_plan)
Expand Down Expand Up @@ -221,6 +242,8 @@ pub struct DBTransaction<'a, S: Storage + 'a> {
inner: S::TransactionType<'a>,
functions: Arc<Functions>,
_guard: ArcRwLockReadGuard<RawRwLock, ()>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
pub(crate) table_cache: Arc<ShardingLruCache<String, TableCatalog>>,
}

impl<S: Storage> DBTransaction<'_, S> {
Expand All @@ -235,10 +258,16 @@ impl<S: Storage> DBTransaction<'_, S> {
"`DDL` is not allowed to execute within a transaction".to_string(),
));
}
let mut plan = Database::<S>::build_plan(stmt, &self.inner, &self.functions)?;
let mut plan = Database::<S>::build_plan(
stmt,
&self.table_cache,
&self.meta_cache,
&self.inner,
&self.functions,
)?;

let schema = plan.output_schema().clone();
let executor = build_write(plan, &mut self.inner);
let executor = build_write(plan, (&self.table_cache, &self.meta_cache), &mut self.inner);

Ok((schema, try_collect(executor)?))
}
Expand All @@ -258,7 +287,7 @@ mod test {
use crate::expression::ScalarExpression;
use crate::expression::{BinaryOperator, UnaryOperator};
use crate::function;
use crate::storage::{Storage, Transaction};
use crate::storage::{Storage, TableCache, Transaction};
use crate::types::evaluator::EvaluatorFactory;
use crate::types::tuple::{create_table, Tuple};
use crate::types::value::{DataValue, ValueRef};
Expand All @@ -268,7 +297,10 @@ mod test {
use std::sync::Arc;
use tempfile::TempDir;

fn build_table(mut transaction: impl Transaction) -> Result<(), DatabaseError> {
fn build_table(
table_cache: &TableCache,
mut transaction: impl Transaction,
) -> Result<(), DatabaseError> {
let columns = vec![
ColumnCatalog::new(
"c1".to_string(),
Expand All @@ -281,7 +313,8 @@ mod test {
ColumnDesc::new(LogicalType::Boolean, false, false, None),
),
];
let _ = transaction.create_table(Arc::new("t1".to_string()), columns, false)?;
let _ =
transaction.create_table(table_cache, Arc::new("t1".to_string()), columns, false)?;
transaction.commit()?;

Ok(())
Expand All @@ -292,7 +325,7 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let database = DataBaseBuilder::path(temp_dir.path()).build()?;
let transaction = database.storage.transaction()?;
build_table(transaction)?;
build_table(&database.table_cache, transaction)?;

let batch = database.run("select * from t1")?;

Expand Down
Loading
Loading