Skip to content

Commit

Permalink
Fix/serial ddl (#177)
Browse files Browse the repository at this point in the history
* fix: added `mdl` to control DDL serial execution

* perf:make txn share storage table catalog cache

* fix: code merge

* style: code fmt

* fix: rollback `analyze` on `ddl` to `dml`

---------

Co-authored-by: crwen <[email protected]>
  • Loading branch information
KKould and crwen authored Mar 25, 2024
1 parent b8268c5 commit 8fe905a
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ harness = false

[dependencies]
ahash = { version = "0.8.11" }
async-lock = { version = "3.3.0" }
async-trait = { version = "0.1.77", optional = true }
bincode = { version = "1.3.3" }
bytes = { version = "1.5.0" }
Expand All @@ -50,7 +51,6 @@ kip_db = { version = "0.1.2-alpha.25.fix2" }
lazy_static = { version = "1.4.0" }
log = { version = "0.4.21", optional = true }
ordered-float = { version = "4.2.0" }
parking_lot = { version = "0.12.1" }
petgraph = { version = "0.6.4" }
pgwire = { version = "0.19.2", optional = true }
rand = { version = "0.9.0-alpha.0" }
Expand Down
5 changes: 1 addition & 4 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
))
}
} else {
Err(DatabaseError::InvalidTable(format!(
"not found table {}",
table_name
)))
Err(DatabaseError::TableNotFound)
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,32 @@ pub enum InputRefType {
GroupBy,
}

pub enum CommandType {
DQL,
DML,
DDL,
}

pub fn command_type(stmt: &Statement) -> Result<CommandType, DatabaseError> {
match stmt {
Statement::CreateTable { .. }
| Statement::CreateIndex { .. }
| Statement::AlterTable { .. }
| Statement::Drop { .. } => Ok(CommandType::DDL),
Statement::Query(_)
| Statement::Explain { .. }
| Statement::ExplainTable { .. }
| Statement::ShowTables { .. } => Ok(CommandType::DQL),
Statement::Analyze { .. }
| Statement::Truncate { .. }
| Statement::Update { .. }
| Statement::Delete { .. }
| Statement::Insert { .. }
| Statement::Copy { .. } => Ok(CommandType::DML),
stmt => Err(DatabaseError::UnsupportedStmt(stmt.to_string())),
}
}

// Tips: only query now!
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
pub enum QueryBindStep {
Expand Down
74 changes: 52 additions & 22 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use ahash::HashMap;
use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc};
use sqlparser::ast::Statement;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use crate::binder::{Binder, BinderContext};
use crate::binder::{command_type, Binder, BinderContext, CommandType};
use crate::errors::DatabaseError;
use crate::execution::volcano::{build_write, try_collect};
use crate::expression::function::{FunctionSummary, ScalarFunctionImpl};
Expand All @@ -19,6 +21,12 @@ use crate::types::tuple::{SchemaRef, Tuple};

pub(crate) type Functions = HashMap<FunctionSummary, Arc<dyn ScalarFunctionImpl>>;

#[allow(dead_code)]
pub(crate) enum MetaDataLock {
Read(RwLockReadGuardArc<()>),
Write(RwLockWriteGuardArc<()>),
}

pub struct DataBaseBuilder {
path: PathBuf,
functions: Functions,
Expand All @@ -45,13 +53,15 @@ impl DataBaseBuilder {
Ok(Database {
storage,
functions: Arc::new(self.functions),
mdl: Arc::new(RwLock::new(())),
})
}
}

pub struct Database<S: Storage> {
pub storage: S,
functions: Arc<Functions>,
mdl: Arc<RwLock<()>>,
}

impl<S: Storage> Database<S> {
Expand All @@ -60,8 +70,19 @@ impl<S: Storage> Database<S> {
&self,
sql: T,
) -> Result<(SchemaRef, Vec<Tuple>), DatabaseError> {
// parse
let stmts = parse_sql(sql)?;
if stmts.is_empty() {
return Err(DatabaseError::EmptyStatement);
}
let stmt = &stmts[0];
let _guard = if matches!(command_type(stmt)?, CommandType::DDL) {
MetaDataLock::Write(self.mdl.write_arc().await)
} else {
MetaDataLock::Read(self.mdl.read_arc().await)
};
let transaction = self.storage.transaction().await?;
let plan = Self::build_plan::<T, S::TransactionType>(sql, &transaction, &self.functions)?;
let plan = Self::build_plan(stmt, &transaction, &self.functions)?;

Self::run_volcano(transaction, plan).await
}
Expand All @@ -81,24 +102,21 @@ impl<S: Storage> Database<S> {
}

pub async fn new_transaction(&self) -> Result<DBTransaction<S>, DatabaseError> {
let guard = self.mdl.read_arc().await;
let transaction = self.storage.transaction().await?;

Ok(DBTransaction {
inner: transaction,
functions: self.functions.clone(),
_guard: guard,
})
}

pub fn build_plan<V: AsRef<str>, T: Transaction>(
sql: V,
pub(crate) fn build_plan(
stmt: &Statement,
transaction: &<S as Storage>::TransactionType,
functions: &Functions,
) -> Result<LogicalPlan, DatabaseError> {
// parse
let stmts = parse_sql(sql)?;
if stmts.is_empty() {
return Err(DatabaseError::EmptyStatement);
}
let mut binder = Binder::new(
BinderContext::new(transaction, functions, Arc::new(AtomicUsize::new(0))),
None,
Expand All @@ -110,7 +128,7 @@ impl<S: Storage> Database<S> {
/// Sort(a)
/// Limit(1)
/// Project(a,b)
let source_plan = binder.bind(&stmts[0])?;
let source_plan = binder.bind(stmt)?;
// println!("source_plan plan: {:#?}", source_plan);

let best_plan =
Expand Down Expand Up @@ -200,15 +218,26 @@ impl<S: Storage> Database<S> {
pub struct DBTransaction<S: Storage> {
inner: S::TransactionType,
functions: Arc<Functions>,
_guard: RwLockReadGuardArc<()>,
}

impl<S: Storage> DBTransaction<S> {
pub async fn run<T: AsRef<str>>(
&mut self,
sql: T,
) -> Result<(SchemaRef, Vec<Tuple>), DatabaseError> {
let mut plan =
Database::<S>::build_plan::<T, S::TransactionType>(sql, &self.inner, &self.functions)?;
let stmts = parse_sql(sql)?;
if stmts.is_empty() {
return Err(DatabaseError::EmptyStatement);
}
let stmt = &stmts[0];
if matches!(command_type(stmt)?, CommandType::DDL) {
return Err(DatabaseError::UnsupportedStmt(
"`DDL` is not allowed to execute within a transaction".to_string(),
));
}
let mut plan = Database::<S>::build_plan(stmt, &self.inner, &self.functions)?;

let schema = plan.output_schema().clone();
let mut stream = build_write(plan, &mut self.inner);

Expand Down Expand Up @@ -298,20 +327,17 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?;

let mut tx_1 = fnck_sql.new_transaction().await?;
let mut tx_2 = fnck_sql.new_transaction().await?;

let _ = tx_1
let _ = fnck_sql
.run("create table t1 (a int primary key, b int)")
.await?;
let _ = tx_2
.run("create table t1 (c int primary key, d int)")
.await?;

let mut tx_1 = fnck_sql.new_transaction().await?;
let mut tx_2 = fnck_sql.new_transaction().await?;

let _ = tx_1.run("insert into t1 values(0, 0)").await?;
let _ = tx_1.run("insert into t1 values(1, 1)").await?;

let _ = tx_2.run("insert into t1 values(2, 2)").await?;
let _ = tx_2.run("insert into t1 values(0, 0)").await?;
let _ = tx_2.run("insert into t1 values(3, 3)").await?;

let (_, tuples_1) = tx_1.run("select * from t1").await?;
Expand All @@ -338,8 +364,8 @@ mod test {
assert_eq!(
tuples_2[0].values,
vec![
Arc::new(DataValue::Int32(Some(2))),
Arc::new(DataValue::Int32(Some(2)))
Arc::new(DataValue::Int32(Some(0))),
Arc::new(DataValue::Int32(Some(0)))
]
);
assert_eq!(
Expand All @@ -354,6 +380,10 @@ mod test {

assert!(tx_2.commit().await.is_err());

let mut tx_3 = fnck_sql.new_transaction().await?;
let res = tx_3.run("create table t2 (a int primary key, b int)").await;
assert!(res.is_err());

Ok(())
}
}
7 changes: 5 additions & 2 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
pub struct KipStorage {
pub inner: Arc<storage::KipStorage>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
pub(crate) table_cache: Arc<ShardingLruCache<String, TableCatalog>>,
}

impl KipStorage {
Expand All @@ -31,10 +32,12 @@ impl KipStorage {
storage::KipStorage::open_with_config(Config::new(path).enable_level_0_memorization())
.await?;
let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap());
let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap());

Ok(KipStorage {
inner: Arc::new(storage),
meta_cache,
table_cache,
})
}
}
Expand All @@ -47,7 +50,7 @@ impl Storage for KipStorage {

Ok(KipTransaction {
tx,
table_cache: ShardingLruCache::new(8, 2, RandomState::default())?,
table_cache: Arc::clone(&self.table_cache),
meta_cache: self.meta_cache.clone(),
})
}
Expand All @@ -57,7 +60,7 @@ pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), Sta

pub struct KipTransaction {
tx: mvcc::Transaction,
table_cache: ShardingLruCache<String, TableCatalog>,
table_cache: Arc<ShardingLruCache<String, TableCatalog>>,
meta_cache: Arc<StatisticsMetaCache>,
}

Expand Down
10 changes: 10 additions & 0 deletions tests/slt/basic_test.slt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ select CAST(name AS BIGINT) from t
statement ok
select CAST(id AS VARCHAR) from t

statement ok
create table t1 (id int primary key, name VARCHAR NOT NULL)

# issue: https://github.com/KipData/FnckSQL/issues/175
statement error
select t.name from t1;

statement ok
drop table t1

statement ok
drop table if exists t

Expand Down

0 comments on commit 8fe905a

Please sign in to comment.