Skip to content

Commit

Permalink
feat: Implement Codegen Executor (#117)
Browse files Browse the repository at this point in the history
* feat: Implement Codegen Executor

* feat: Implement Limit on Codegen Executor

* feat: Implement SimpleAgg on Codegen Executor

* feat: Implement Sort on Codegen Executor

* feat: Implement HashJoin on Codegen Executor

* fix: the projection of fields with the same name in different tables is incorrect.

* perf: fusion loop for HashJoin

* feat: Implement HashAgg on Codegen Executor

* feat: new `DataBase::run_on_query` to choose Execute on Query

* feat: Implement IndexScan on Codegen Executor

* bench: added `QueryBench` to compare the execution performance of Volcano and Codegen

* feat: conditionally compile the `Codegen` executor as a feature

* fix: required feature on `test_crud_sql`
  • Loading branch information
KKould authored Jan 9, 2024
1 parent 92618d2 commit 890c5bb
Show file tree
Hide file tree
Showing 88 changed files with 2,026 additions and 418 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ Cargo.lock
.DS_Store

/hello_world
/transaction
/transaction

query_bench_data/
19 changes: 17 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "kip-sql"
version = "0.0.1-alpha.7"
version = "0.0.1-alpha.8"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "build the SQL layer of KipDB database"
Expand All @@ -15,6 +15,17 @@ categories = ["development-tools", "database"]
[lib]
doctest = false

[features]
default = ["marcos"]
marcos = []
codegen_execute = ["dep:mlua"]

[[bench]]
name = "query_bench"
path = "benchmarks/query_benchmark.rs"
harness = false
required-features = ["codegen_execute"]

[dependencies]
sqlparser = "0.34.0"
thiserror = "1"
Expand All @@ -35,14 +46,18 @@ ahash = "0.8.3"
lazy_static = "1.4.0"
comfy-table = "7.0.1"
bytes = "1.5.0"
kip_db = "0.1.2-alpha.20"
kip_db = "0.1.2-alpha.21"
rust_decimal = "1"
csv = "1"
regex = "1.10.2"
clap = "4.4.11"

mlua = { version = "0.9.1", features = ["luajit", "vendored", "macros", "async"], optional = true }

[dev-dependencies]
cargo-tarpaulin = "0.27.1"
criterion = { version = "0.3.5", features = ["async_tokio", "html_reports"] }
indicatif = "0.17"
tokio-test = "0.4.2"
ctor = "0.2.0"
env_logger = "0.10"
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Storage Support:
- KipDB

### Features
- ORM Mapping
- ORM Mapping: `features = ["marcos"]`
```rust
#[derive(Debug, Clone, Default)]
pub struct Post {
Expand All @@ -109,6 +109,9 @@ implement_from_tuple!(Post, (
}
));
```
- Execute
- Volcano
- Codegen on LuaJIT: `features = ["codegen_execute"]`
- MVCC Transaction
- Optimistic
- SQL field options
Expand Down
111 changes: 111 additions & 0 deletions benchmarks/query_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use criterion::{criterion_group, criterion_main, Criterion};
use indicatif::{ProgressBar, ProgressStyle};
use kip_sql::db::{Database, DatabaseError};
use kip_sql::execution::{codegen, volcano};
use kip_sql::storage::kip::KipStorage;
use kip_sql::storage::Storage;
use std::cell::RefCell;
use std::fs;
use std::sync::Arc;

const QUERY_BENCH_PATH: &'static str = "./query_bench_data";
const TABLE_ROW_NUM: u64 = 2_00_000;

async fn init_query_bench() -> Result<(), DatabaseError> {
let database = Database::with_kipdb(QUERY_BENCH_PATH).await.unwrap();
database
.run("create table t1 (c1 int primary key, c2 int)")
.await?;
let pb = ProgressBar::new(TABLE_ROW_NUM);
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}")
.unwrap(),
);

for i in 0..TABLE_ROW_NUM {
let _ = database
.run(format!("insert into t1 values({}, {})", i, i + 1).as_str())
.await?;
pb.set_position(i + 1);
}
pb.finish_with_message("Insert completed!");

Ok(())
}

fn path_exists_and_is_directory(path: &str) -> bool {
match fs::metadata(path) {
Ok(metadata) => metadata.is_dir(),
Err(_) => false,
}
}

fn query_on_execute(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.enable_all()
.build()
.unwrap();
let database = rt.block_on(async {
if !path_exists_and_is_directory(QUERY_BENCH_PATH) {
println!(
"The table is not initialized and data insertion is started. => {}",
TABLE_ROW_NUM
);

init_query_bench().await.unwrap();
}

Database::<KipStorage>::with_kipdb(QUERY_BENCH_PATH)
.await
.unwrap()
});

println!("Table initialization completed");

let (codegen_transaction, plan) = rt.block_on(async {
let transaction = database.storage.transaction().await.unwrap();
let (plan, _) =
Database::<KipStorage>::build_plan("select * from t1", &transaction).unwrap();

(Arc::new(transaction), plan)
});

c.bench_function("Codegen: select all", |b| {
b.to_async(&rt).iter(|| async {
let tuples = codegen::execute(plan.clone(), codegen_transaction.clone())
.await
.unwrap();
if tuples.len() as u64 != TABLE_ROW_NUM {
panic!("{}", tuples.len());
}
})
});

let (volcano_transaction, plan) = rt.block_on(async {
let transaction = database.storage.transaction().await.unwrap();
let (plan, _) =
Database::<KipStorage>::build_plan("select * from t1", &transaction).unwrap();

(RefCell::new(transaction), plan)
});

c.bench_function("Volcano: select all", |b| {
b.to_async(&rt).iter(|| async {
let mut stream = volcano::build_stream(plan.clone(), &volcano_transaction);
let tuples = volcano::try_collect(&mut stream).await.unwrap();
if tuples.len() as u64 != TABLE_ROW_NUM {
panic!("{}", tuples.len());
}
})
});
}

criterion_group!(
name = query_benches;
config = Criterion::default().sample_size(10);
targets = query_on_execute
);

criterion_main!(query_benches,);
1 change: 1 addition & 0 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ implement_from_tuple!(
)
);

#[cfg(feature = "marcos")]
#[tokio::main]
async fn main() -> Result<(), DatabaseError> {
let database = Database::with_kipdb("./hello_world").await?;
Expand Down
2 changes: 1 addition & 1 deletion src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::Binder;

use crate::binder::BindError;
use crate::catalog::{ColumnCatalog, TableCatalog, TableName};
use crate::execution::executor::dql::join::joins_nullable;
use crate::execution::volcano::dql::join::joins_nullable;
use crate::expression::BinaryOperator;
use crate::planner::operator::join::JoinCondition;
use crate::planner::operator::sort::{SortField, SortOperator};
Expand Down
6 changes: 5 additions & 1 deletion src/catalog/column.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::catalog::TableName;
use crate::expression::ScalarExpression;
use serde::{Deserialize, Serialize};
use std::hash::Hash;
Expand All @@ -20,6 +21,7 @@ pub struct ColumnCatalog {
pub struct ColumnSummary {
pub id: Option<ColumnId>,
pub name: String,
pub table_name: Option<TableName>,
}

impl ColumnCatalog {
Expand All @@ -33,6 +35,7 @@ impl ColumnCatalog {
summary: ColumnSummary {
id: None,
name: column_name,
table_name: None,
},
nullable,
desc: column_desc,
Expand All @@ -43,8 +46,9 @@ impl ColumnCatalog {
pub(crate) fn new_dummy(column_name: String) -> ColumnCatalog {
ColumnCatalog {
summary: ColumnSummary {
id: Some(0),
id: None,
name: column_name,
table_name: None,
},
nullable: false,
desc: ColumnDesc::new(LogicalType::Varchar(None), false, false, None),
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ impl TableCatalog {

let col_id = self.columns.len() as u32;

col.summary.table_name = Some(self.name.clone());
col.summary.id = Some(col_id);

self.column_idxs.insert(col.name().to_string(), col_id);
self.columns.insert(col_id, Arc::new(col));

Expand Down
Loading

0 comments on commit 890c5bb

Please sign in to comment.