From bc211d7bef755d1aa89196bcc588b1e785ec6bdb Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sun, 3 Dec 2023 18:43:16 +0800 Subject: [PATCH] Support `DBTransaction` For `DataBase` (#107) * feat: Support transaction running SQL * feat: add examples * code fmt --- .gitignore | 5 +- Cargo.toml | 2 +- README.md | 2 + examples/hello_world.rs | 51 ++++++++++++++++ examples/transaction.rs | 22 +++++++ src/catalog/column.rs | 2 +- src/db.rs | 126 +++++++++++++++++++++++++++++++++++----- src/storage/kip.rs | 4 +- 8 files changed, 196 insertions(+), 18 deletions(-) create mode 100644 examples/hello_world.rs create mode 100644 examples/transaction.rs diff --git a/.gitignore b/.gitignore index 8f3c9856..0b578254 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,7 @@ Cargo.lock /.vscode /.idea /.obsidian -.DS_Store \ No newline at end of file +.DS_Store + +/hello_world +/transaction \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a3750e2f..61774a7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ ahash = "0.8.3" lazy_static = "1.4.0" comfy-table = "7.0.1" bytes = "1.5.0" -kip_db = "0.1.2-alpha.18" +kip_db = "0.1.2-alpha.19" rust_decimal = "1" csv = "1" regex = "1.10.2" diff --git a/README.md b/README.md index a017ec0c..439401dd 100755 --- a/README.md +++ b/README.md @@ -109,6 +109,8 @@ implement_from_tuple!(Post, ( } )); ``` +- MVCC Transaction + - Optimistic - SQL field options - not null - null diff --git a/examples/hello_world.rs b/examples/hello_world.rs new file mode 100644 index 00000000..676d5c4e --- /dev/null +++ b/examples/hello_world.rs @@ -0,0 +1,51 @@ +use itertools::Itertools; +use kip_sql::db::{Database, DatabaseError}; +use kip_sql::implement_from_tuple; +use kip_sql::types::tuple::Tuple; +use kip_sql::types::value::DataValue; +use kip_sql::types::LogicalType; + +#[derive(Default, Debug, PartialEq)] +struct MyStruct { + pub c1: i32, + pub c2: String, +} + +implement_from_tuple!( + MyStruct, ( + c1: i32 => |inner: &mut MyStruct, value| { + if let DataValue::Int32(Some(val)) = value { + inner.c1 = val; + } + }, + c2: String => |inner: &mut MyStruct, value| { + if let DataValue::Utf8(Some(val)) = value { + inner.c2 = val; + } + } + ) +); + +#[tokio::main] +async fn main() -> Result<(), DatabaseError> { + let database = Database::with_kipdb("./hello_world").await?; + + let _ = database + .run("create table if not exists my_struct (c1 int primary key, c2 int)") + .await?; + let _ = database + .run("insert into my_struct values(0, 0), (1, 1)") + .await?; + let tuples = database + .run("select * from my_struct") + .await? + .into_iter() + .map(MyStruct::from) + .collect_vec(); + + println!("{:#?}", tuples); + + let _ = database.run("drop table my_struct").await?; + + Ok(()) +} diff --git a/examples/transaction.rs b/examples/transaction.rs new file mode 100644 index 00000000..a3d90573 --- /dev/null +++ b/examples/transaction.rs @@ -0,0 +1,22 @@ +use kip_sql::db::{Database, DatabaseError}; + +#[tokio::main] +async fn main() -> Result<(), DatabaseError> { + let database = Database::with_kipdb("./transaction").await?; + let mut tx_1 = database.new_transaction().await?; + + let _ = tx_1 + .run("create table if not exists t1 (c1 int primary key, c2 int)") + .await?; + let _ = tx_1.run("insert into t1 values(0, 0), (1, 1)").await?; + + assert!(database.run("select * from t1").await.is_err()); + + tx_1.commit().await?; + + assert!(database.run("select * from t1").await.is_ok()); + + let _ = database.run("drop table t1").await?; + + Ok(()) +} diff --git a/src/catalog/column.rs b/src/catalog/column.rs index be603484..563f68d2 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -60,7 +60,7 @@ impl ColumnCatalog { self.summary.id } - pub(crate) fn name(&self) -> &str { + pub fn name(&self) -> &str { &self.summary.name } diff --git a/src/db.rs b/src/db.rs index af03501e..2a7a1670 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,7 +3,7 @@ use std::cell::RefCell; use std::path::PathBuf; use crate::binder::{BindError, Binder, BinderContext}; -use crate::execution::executor::{build, try_collect}; +use crate::execution::executor::{build, try_collect, BoxedExecutor}; use crate::execution::ExecutorError; use crate::optimizer::heuristic::batch::HepBatchStrategy; use crate::optimizer::heuristic::optimizer::HepOptimizer; @@ -16,7 +16,7 @@ use crate::storage::{Storage, StorageError, Transaction}; use crate::types::tuple::Tuple; pub struct Database { - pub storage: S, + pub(crate) storage: S, } impl Database { @@ -37,14 +37,35 @@ impl Database { /// Run SQL queries. pub async fn run(&self, sql: &str) -> Result, DatabaseError> { let transaction = self.storage.transaction().await?; + let transaction = RefCell::new(transaction); + let mut stream = Self::_run(sql, &transaction)?; + let tuples = try_collect(&mut stream).await?; + + transaction.into_inner().commit().await?; + + Ok(tuples) + } + + pub async fn new_transaction(&self) -> Result, DatabaseError> { + let transaction = self.storage.transaction().await?; + + Ok(DBTransaction { + inner: RefCell::new(transaction), + }) + } + + fn _run( + sql: &str, + transaction: &RefCell<::TransactionType>, + ) -> Result { // parse let stmts = parse_sql(sql)?; - if stmts.is_empty() { - return Ok(vec![]); + return Err(DatabaseError::EmptyStatement); } - let binder = Binder::new(BinderContext::new(&transaction)); - + let binder = Binder::new(BinderContext::new(unsafe { + transaction.as_ptr().as_ref().unwrap() + })); /// Build a logical plan. /// /// SELECT a,b FROM t1 ORDER BY a LIMIT 1; @@ -58,13 +79,7 @@ impl Database { let best_plan = Self::default_optimizer(source_plan).find_best()?; // println!("best_plan plan: {:#?}", best_plan); - let transaction = RefCell::new(transaction); - let mut stream = build(best_plan, &transaction); - let tuples = try_collect(&mut stream).await?; - - transaction.into_inner().commit().await?; - - Ok(tuples) + Ok(build(best_plan, &transaction)) } fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer { @@ -105,8 +120,28 @@ impl Database { } } +pub struct DBTransaction { + inner: RefCell, +} + +impl DBTransaction { + pub async fn run(&mut self, sql: &str) -> Result, DatabaseError> { + let mut stream = Database::::_run(sql, &self.inner)?; + + Ok(try_collect(&mut stream).await?) + } + + pub async fn commit(self) -> Result<(), DatabaseError> { + self.inner.into_inner().commit().await?; + + Ok(()) + } +} + #[derive(thiserror::Error, Debug)] pub enum DatabaseError { + #[error("sql statement is empty")] + EmptyStatement, #[error("parse error: {0}")] Parse( #[source] @@ -147,6 +182,7 @@ mod test { use crate::db::{Database, DatabaseError}; use crate::storage::{Storage, StorageError, Transaction}; use crate::types::tuple::create_table; + use crate::types::value::DataValue; use crate::types::LogicalType; use std::sync::Arc; use tempfile::TempDir; @@ -185,6 +221,70 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_transaction_sql() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let kipsql = Database::with_kipdb(temp_dir.path()).await?; + + let mut tx_1 = kipsql.new_transaction().await?; + let mut tx_2 = kipsql.new_transaction().await?; + + let _ = tx_1 + .run("create table t1 (a int primary key, b int)") + .await?; + let _ = tx_2 + .run("create table t1 (c int primary key, d int)") + .await?; + + let _ = tx_1.run("insert into t1 values(0, 0)").await?; + let _ = tx_1.run("insert into t1 values(1, 1)").await?; + + let _ = tx_2.run("insert into t1 values(2, 2)").await?; + let _ = tx_2.run("insert into t1 values(3, 3)").await?; + + let tuples_1 = tx_1.run("select * from t1").await?; + let tuples_2 = tx_2.run("select * from t1").await?; + + assert_eq!(tuples_1.len(), 2); + assert_eq!(tuples_2.len(), 2); + + assert_eq!( + tuples_1[0].values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) + ] + ); + assert_eq!( + tuples_1[1].values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(1))) + ] + ); + + assert_eq!( + tuples_2[0].values, + vec![ + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Int32(Some(2))) + ] + ); + assert_eq!( + tuples_2[1].values, + vec![ + Arc::new(DataValue::Int32(Some(3))), + Arc::new(DataValue::Int32(Some(3))) + ] + ); + + tx_1.commit().await?; + + assert!(tx_2.commit().await.is_err()); + + Ok(()) + } + #[tokio::test] async fn test_crud_sql() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); diff --git a/src/storage/kip.rs b/src/storage/kip.rs index fae1a2df..d703d621 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -7,7 +7,7 @@ use crate::storage::{ use crate::types::index::{Index, IndexMeta, IndexMetaRef}; use crate::types::tuple::{Tuple, TupleId}; use kip_db::kernel::lsm::iterator::Iter as KipDBIter; -use kip_db::kernel::lsm::mvcc::TransactionIter; +use kip_db::kernel::lsm::mvcc::{CheckType, TransactionIter}; use kip_db::kernel::lsm::storage::Config; use kip_db::kernel::lsm::{mvcc, storage}; use kip_db::kernel::utils::lru_cache::ShardingLruCache; @@ -37,7 +37,7 @@ impl Storage for KipStorage { type TransactionType = KipTransaction; async fn transaction(&self) -> Result { - let tx = self.inner.new_transaction().await; + let tx = self.inner.new_transaction(CheckType::Optimistic).await; Ok(KipTransaction { tx,