Skip to content

Commit

Permalink
Support DBTransaction For DataBase (#107)
Browse files Browse the repository at this point in the history
* feat: Support transaction running SQL

* feat: add examples

* code fmt
  • Loading branch information
KKould authored Dec 3, 2023
1 parent e7dbac1 commit bc211d7
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 18 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ Cargo.lock
/.vscode
/.idea
/.obsidian
.DS_Store
.DS_Store

/hello_world
/transaction
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ahash = "0.8.3"
lazy_static = "1.4.0"
comfy-table = "7.0.1"
bytes = "1.5.0"
kip_db = "0.1.2-alpha.18"
kip_db = "0.1.2-alpha.19"
rust_decimal = "1"
csv = "1"
regex = "1.10.2"
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ implement_from_tuple!(Post, (
}
));
```
- MVCC Transaction
- Optimistic
- SQL field options
- not null
- null
Expand Down
51 changes: 51 additions & 0 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use itertools::Itertools;
use kip_sql::db::{Database, DatabaseError};
use kip_sql::implement_from_tuple;
use kip_sql::types::tuple::Tuple;
use kip_sql::types::value::DataValue;
use kip_sql::types::LogicalType;

#[derive(Default, Debug, PartialEq)]
struct MyStruct {
pub c1: i32,
pub c2: String,
}

implement_from_tuple!(
MyStruct, (
c1: i32 => |inner: &mut MyStruct, value| {
if let DataValue::Int32(Some(val)) = value {
inner.c1 = val;
}
},
c2: String => |inner: &mut MyStruct, value| {
if let DataValue::Utf8(Some(val)) = value {
inner.c2 = val;
}
}
)
);

#[tokio::main]
async fn main() -> Result<(), DatabaseError> {
let database = Database::with_kipdb("./hello_world").await?;

let _ = database
.run("create table if not exists my_struct (c1 int primary key, c2 int)")
.await?;
let _ = database
.run("insert into my_struct values(0, 0), (1, 1)")
.await?;
let tuples = database
.run("select * from my_struct")
.await?
.into_iter()
.map(MyStruct::from)
.collect_vec();

println!("{:#?}", tuples);

let _ = database.run("drop table my_struct").await?;

Ok(())
}
22 changes: 22 additions & 0 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use kip_sql::db::{Database, DatabaseError};

#[tokio::main]
async fn main() -> Result<(), DatabaseError> {
let database = Database::with_kipdb("./transaction").await?;
let mut tx_1 = database.new_transaction().await?;

let _ = tx_1
.run("create table if not exists t1 (c1 int primary key, c2 int)")
.await?;
let _ = tx_1.run("insert into t1 values(0, 0), (1, 1)").await?;

assert!(database.run("select * from t1").await.is_err());

tx_1.commit().await?;

assert!(database.run("select * from t1").await.is_ok());

let _ = database.run("drop table t1").await?;

Ok(())
}
2 changes: 1 addition & 1 deletion src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ColumnCatalog {
self.summary.id
}

pub(crate) fn name(&self) -> &str {
pub fn name(&self) -> &str {
&self.summary.name
}

Expand Down
126 changes: 113 additions & 13 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cell::RefCell;
use std::path::PathBuf;

use crate::binder::{BindError, Binder, BinderContext};
use crate::execution::executor::{build, try_collect};
use crate::execution::executor::{build, try_collect, BoxedExecutor};
use crate::execution::ExecutorError;
use crate::optimizer::heuristic::batch::HepBatchStrategy;
use crate::optimizer::heuristic::optimizer::HepOptimizer;
Expand All @@ -16,7 +16,7 @@ use crate::storage::{Storage, StorageError, Transaction};
use crate::types::tuple::Tuple;

pub struct Database<S: Storage> {
pub storage: S,
pub(crate) storage: S,
}

impl Database<KipStorage> {
Expand All @@ -37,14 +37,35 @@ impl<S: Storage> Database<S> {
/// Run SQL queries.
pub async fn run(&self, sql: &str) -> Result<Vec<Tuple>, DatabaseError> {
let transaction = self.storage.transaction().await?;
let transaction = RefCell::new(transaction);
let mut stream = Self::_run(sql, &transaction)?;
let tuples = try_collect(&mut stream).await?;

transaction.into_inner().commit().await?;

Ok(tuples)
}

pub async fn new_transaction(&self) -> Result<DBTransaction<S>, DatabaseError> {
let transaction = self.storage.transaction().await?;

Ok(DBTransaction {
inner: RefCell::new(transaction),
})
}

fn _run(
sql: &str,
transaction: &RefCell<<S as Storage>::TransactionType>,
) -> Result<BoxedExecutor, DatabaseError> {
// parse
let stmts = parse_sql(sql)?;

if stmts.is_empty() {
return Ok(vec![]);
return Err(DatabaseError::EmptyStatement);
}
let binder = Binder::new(BinderContext::new(&transaction));

let binder = Binder::new(BinderContext::new(unsafe {
transaction.as_ptr().as_ref().unwrap()
}));
/// Build a logical plan.
///
/// SELECT a,b FROM t1 ORDER BY a LIMIT 1;
Expand All @@ -58,13 +79,7 @@ impl<S: Storage> Database<S> {
let best_plan = Self::default_optimizer(source_plan).find_best()?;
// println!("best_plan plan: {:#?}", best_plan);

let transaction = RefCell::new(transaction);
let mut stream = build(best_plan, &transaction);
let tuples = try_collect(&mut stream).await?;

transaction.into_inner().commit().await?;

Ok(tuples)
Ok(build(best_plan, &transaction))
}

fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer {
Expand Down Expand Up @@ -105,8 +120,28 @@ impl<S: Storage> Database<S> {
}
}

pub struct DBTransaction<S: Storage> {
inner: RefCell<S::TransactionType>,
}

impl<S: Storage> DBTransaction<S> {
pub async fn run(&mut self, sql: &str) -> Result<Vec<Tuple>, DatabaseError> {
let mut stream = Database::<S>::_run(sql, &self.inner)?;

Ok(try_collect(&mut stream).await?)
}

pub async fn commit(self) -> Result<(), DatabaseError> {
self.inner.into_inner().commit().await?;

Ok(())
}
}

#[derive(thiserror::Error, Debug)]
pub enum DatabaseError {
#[error("sql statement is empty")]
EmptyStatement,
#[error("parse error: {0}")]
Parse(
#[source]
Expand Down Expand Up @@ -147,6 +182,7 @@ mod test {
use crate::db::{Database, DatabaseError};
use crate::storage::{Storage, StorageError, Transaction};
use crate::types::tuple::create_table;
use crate::types::value::DataValue;
use crate::types::LogicalType;
use std::sync::Arc;
use tempfile::TempDir;
Expand Down Expand Up @@ -185,6 +221,70 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_transaction_sql() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kipsql = Database::with_kipdb(temp_dir.path()).await?;

let mut tx_1 = kipsql.new_transaction().await?;
let mut tx_2 = kipsql.new_transaction().await?;

let _ = tx_1
.run("create table t1 (a int primary key, b int)")
.await?;
let _ = tx_2
.run("create table t1 (c int primary key, d int)")
.await?;

let _ = tx_1.run("insert into t1 values(0, 0)").await?;
let _ = tx_1.run("insert into t1 values(1, 1)").await?;

let _ = tx_2.run("insert into t1 values(2, 2)").await?;
let _ = tx_2.run("insert into t1 values(3, 3)").await?;

let tuples_1 = tx_1.run("select * from t1").await?;
let tuples_2 = tx_2.run("select * from t1").await?;

assert_eq!(tuples_1.len(), 2);
assert_eq!(tuples_2.len(), 2);

assert_eq!(
tuples_1[0].values,
vec![
Arc::new(DataValue::Int32(Some(0))),
Arc::new(DataValue::Int32(Some(0)))
]
);
assert_eq!(
tuples_1[1].values,
vec![
Arc::new(DataValue::Int32(Some(1))),
Arc::new(DataValue::Int32(Some(1)))
]
);

assert_eq!(
tuples_2[0].values,
vec![
Arc::new(DataValue::Int32(Some(2))),
Arc::new(DataValue::Int32(Some(2)))
]
);
assert_eq!(
tuples_2[1].values,
vec![
Arc::new(DataValue::Int32(Some(3))),
Arc::new(DataValue::Int32(Some(3)))
]
);

tx_1.commit().await?;

assert!(tx_2.commit().await.is_err());

Ok(())
}

#[tokio::test]
async fn test_crud_sql() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
Expand Down
4 changes: 2 additions & 2 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::storage::{
use crate::types::index::{Index, IndexMeta, IndexMetaRef};
use crate::types::tuple::{Tuple, TupleId};
use kip_db::kernel::lsm::iterator::Iter as KipDBIter;
use kip_db::kernel::lsm::mvcc::TransactionIter;
use kip_db::kernel::lsm::mvcc::{CheckType, TransactionIter};
use kip_db::kernel::lsm::storage::Config;
use kip_db::kernel::lsm::{mvcc, storage};
use kip_db::kernel::utils::lru_cache::ShardingLruCache;
Expand Down Expand Up @@ -37,7 +37,7 @@ impl Storage for KipStorage {
type TransactionType = KipTransaction;

async fn transaction(&self) -> Result<Self::TransactionType, StorageError> {
let tx = self.inner.new_transaction().await;
let tx = self.inner.new_transaction(CheckType::Optimistic).await;

Ok(KipTransaction {
tx,
Expand Down

0 comments on commit bc211d7

Please sign in to comment.