diff --git a/Cargo.toml b/Cargo.toml index c2d8bff5..3d089da5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ harness = false [dependencies] ahash = { version = "0.8.11" } +async-lock = { version = "3.3.0" } async-trait = { version = "0.1.77", optional = true } bincode = { version = "1.3.3" } bytes = { version = "1.5.0" } @@ -50,7 +51,6 @@ kip_db = { version = "0.1.2-alpha.25.fix2" } lazy_static = { version = "1.4.0" } log = { version = "0.4.21", optional = true } ordered-float = { version = "4.2.0" } -parking_lot = { version = "0.12.1" } petgraph = { version = "0.6.4" } pgwire = { version = "0.19.2", optional = true } rand = { version = "0.9.0-alpha.0" } diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 58160289..64613a5c 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -97,10 +97,7 @@ impl<'a, T: Transaction> Binder<'a, T> { )) } } else { - Err(DatabaseError::InvalidTable(format!( - "not found table {}", - table_name - ))) + Err(DatabaseError::TableNotFound) } } } diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 76836dbe..9919a09f 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -34,6 +34,32 @@ pub enum InputRefType { GroupBy, } +pub enum CommandType { + DQL, + DML, + DDL, +} + +pub fn command_type(stmt: &Statement) -> Result { + match stmt { + Statement::CreateTable { .. } + | Statement::CreateIndex { .. } + | Statement::AlterTable { .. } + | Statement::Drop { .. } => Ok(CommandType::DDL), + Statement::Query(_) + | Statement::Explain { .. } + | Statement::ExplainTable { .. } + | Statement::ShowTables { .. } => Ok(CommandType::DQL), + Statement::Analyze { .. } + | Statement::Truncate { .. } + | Statement::Update { .. } + | Statement::Delete { .. } + | Statement::Insert { .. } + | Statement::Copy { .. } => Ok(CommandType::DML), + stmt => Err(DatabaseError::UnsupportedStmt(stmt.to_string())), + } +} + // Tips: only query now! #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] pub enum QueryBindStep { diff --git a/src/db.rs b/src/db.rs index ea67f3fe..98a9f8b0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,11 @@ use ahash::HashMap; +use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc}; +use sqlparser::ast::Statement; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use crate::binder::{Binder, BinderContext}; +use crate::binder::{command_type, Binder, BinderContext, CommandType}; use crate::errors::DatabaseError; use crate::execution::volcano::{build_write, try_collect}; use crate::expression::function::{FunctionSummary, ScalarFunctionImpl}; @@ -19,6 +21,12 @@ use crate::types::tuple::{SchemaRef, Tuple}; pub(crate) type Functions = HashMap>; +#[allow(dead_code)] +pub(crate) enum MetaDataLock { + Read(RwLockReadGuardArc<()>), + Write(RwLockWriteGuardArc<()>), +} + pub struct DataBaseBuilder { path: PathBuf, functions: Functions, @@ -45,6 +53,7 @@ impl DataBaseBuilder { Ok(Database { storage, functions: Arc::new(self.functions), + mdl: Arc::new(RwLock::new(())), }) } } @@ -52,6 +61,7 @@ impl DataBaseBuilder { pub struct Database { pub storage: S, functions: Arc, + mdl: Arc>, } impl Database { @@ -60,8 +70,19 @@ impl Database { &self, sql: T, ) -> Result<(SchemaRef, Vec), DatabaseError> { + // parse + let stmts = parse_sql(sql)?; + if stmts.is_empty() { + return Err(DatabaseError::EmptyStatement); + } + let stmt = &stmts[0]; + let _guard = if matches!(command_type(stmt)?, CommandType::DDL) { + MetaDataLock::Write(self.mdl.write_arc().await) + } else { + MetaDataLock::Read(self.mdl.read_arc().await) + }; let transaction = self.storage.transaction().await?; - let plan = Self::build_plan::(sql, &transaction, &self.functions)?; + let plan = Self::build_plan(stmt, &transaction, &self.functions)?; Self::run_volcano(transaction, plan).await } @@ -81,24 +102,21 @@ impl Database { } pub async fn new_transaction(&self) -> Result, DatabaseError> { + let guard = self.mdl.read_arc().await; let transaction = self.storage.transaction().await?; Ok(DBTransaction { inner: transaction, functions: self.functions.clone(), + _guard: guard, }) } - pub fn build_plan, T: Transaction>( - sql: V, + pub(crate) fn build_plan( + stmt: &Statement, transaction: &::TransactionType, functions: &Functions, ) -> Result { - // parse - let stmts = parse_sql(sql)?; - if stmts.is_empty() { - return Err(DatabaseError::EmptyStatement); - } let mut binder = Binder::new( BinderContext::new(transaction, functions, Arc::new(AtomicUsize::new(0))), None, @@ -110,7 +128,7 @@ impl Database { /// Sort(a) /// Limit(1) /// Project(a,b) - let source_plan = binder.bind(&stmts[0])?; + let source_plan = binder.bind(stmt)?; // println!("source_plan plan: {:#?}", source_plan); let best_plan = @@ -200,6 +218,7 @@ impl Database { pub struct DBTransaction { inner: S::TransactionType, functions: Arc, + _guard: RwLockReadGuardArc<()>, } impl DBTransaction { @@ -207,8 +226,18 @@ impl DBTransaction { &mut self, sql: T, ) -> Result<(SchemaRef, Vec), DatabaseError> { - let mut plan = - Database::::build_plan::(sql, &self.inner, &self.functions)?; + let stmts = parse_sql(sql)?; + if stmts.is_empty() { + return Err(DatabaseError::EmptyStatement); + } + let stmt = &stmts[0]; + if matches!(command_type(stmt)?, CommandType::DDL) { + return Err(DatabaseError::UnsupportedStmt( + "`DDL` is not allowed to execute within a transaction".to_string(), + )); + } + let mut plan = Database::::build_plan(stmt, &self.inner, &self.functions)?; + let schema = plan.output_schema().clone(); let mut stream = build_write(plan, &mut self.inner); @@ -298,20 +327,17 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; - let mut tx_1 = fnck_sql.new_transaction().await?; - let mut tx_2 = fnck_sql.new_transaction().await?; - - let _ = tx_1 + let _ = fnck_sql .run("create table t1 (a int primary key, b int)") .await?; - let _ = tx_2 - .run("create table t1 (c int primary key, d int)") - .await?; + + let mut tx_1 = fnck_sql.new_transaction().await?; + let mut tx_2 = fnck_sql.new_transaction().await?; let _ = tx_1.run("insert into t1 values(0, 0)").await?; let _ = tx_1.run("insert into t1 values(1, 1)").await?; - let _ = tx_2.run("insert into t1 values(2, 2)").await?; + let _ = tx_2.run("insert into t1 values(0, 0)").await?; let _ = tx_2.run("insert into t1 values(3, 3)").await?; let (_, tuples_1) = tx_1.run("select * from t1").await?; @@ -338,8 +364,8 @@ mod test { assert_eq!( tuples_2[0].values, vec![ - Arc::new(DataValue::Int32(Some(2))), - Arc::new(DataValue::Int32(Some(2))) + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) ] ); assert_eq!( @@ -354,6 +380,10 @@ mod test { assert!(tx_2.commit().await.is_err()); + let mut tx_3 = fnck_sql.new_transaction().await?; + let res = tx_3.run("create table t2 (a int primary key, b int)").await; + assert!(res.is_err()); + Ok(()) } } diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 323989ad..d71ee85e 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -23,6 +23,7 @@ use std::sync::Arc; pub struct KipStorage { pub inner: Arc, pub(crate) meta_cache: Arc, + pub(crate) table_cache: Arc>, } impl KipStorage { @@ -31,10 +32,12 @@ impl KipStorage { storage::KipStorage::open_with_config(Config::new(path).enable_level_0_memorization()) .await?; let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); Ok(KipStorage { inner: Arc::new(storage), meta_cache, + table_cache, }) } } @@ -47,7 +50,7 @@ impl Storage for KipStorage { Ok(KipTransaction { tx, - table_cache: ShardingLruCache::new(8, 2, RandomState::default())?, + table_cache: Arc::clone(&self.table_cache), meta_cache: self.meta_cache.clone(), }) } @@ -57,7 +60,7 @@ pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), Sta pub struct KipTransaction { tx: mvcc::Transaction, - table_cache: ShardingLruCache, + table_cache: Arc>, meta_cache: Arc, } diff --git a/tests/slt/basic_test.slt b/tests/slt/basic_test.slt index 34fff868..b584facf 100644 --- a/tests/slt/basic_test.slt +++ b/tests/slt/basic_test.slt @@ -86,6 +86,16 @@ select CAST(name AS BIGINT) from t statement ok select CAST(id AS VARCHAR) from t +statement ok +create table t1 (id int primary key, name VARCHAR NOT NULL) + +# issue: https://github.com/KipData/FnckSQL/issues/175 +statement error +select t.name from t1; + +statement ok +drop table t1 + statement ok drop table if exists t