Skip to content

Commit

Permalink
Refer to SparkSQL to implement Physical Select of CBO based on RBO. (#…
Browse files Browse the repository at this point in the history
…118)

* bench: added SQLite for comparative testing

* feat: impl Memo of CBO

* feat: impl Histogram of CBO

* feat: add TableMeta of `ShowTable`

* feat: impl Histogram load on `Memo::new` to calculate cost of `Expression`

* feat: impl `AnalyzeTable` to build the histogram of load `Histogram` on `Memo::new`

* feat: completed the integration of CBO into the optimizer(currently only single column index selection is supported)

* perf: optimize row count estimation

- use Count min sketch to estimate equivalence conditions
- for more accurate row number estimation when the range condition intersects with the bucket range of the histogram, refer to BaikalDB

* style: Histogram -> ColumnMeta

* fix: the optimizer does not use CBO by default
  • Loading branch information
KKould authored Jan 28, 2024
1 parent 890c5bb commit ed4c7ee
Show file tree
Hide file tree
Showing 97 changed files with 3,497 additions and 526 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ Cargo.lock
/hello_world
/transaction

query_bench_data/
kipsql_bench
sqlite_bench
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "kip-sql"
version = "0.0.1-alpha.8"
version = "0.0.1-alpha.9"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "build the SQL layer of KipDB database"
Expand All @@ -24,7 +24,6 @@ codegen_execute = ["dep:mlua"]
name = "query_bench"
path = "benchmarks/query_benchmark.rs"
harness = false
required-features = ["codegen_execute"]

[dependencies]
sqlparser = "0.34.0"
Expand All @@ -46,11 +45,14 @@ ahash = "0.8.3"
lazy_static = "1.4.0"
comfy-table = "7.0.1"
bytes = "1.5.0"
kip_db = "0.1.2-alpha.21"
kip_db = "0.1.2-alpha.23.fix5"
rust_decimal = "1"
csv = "1"
regex = "1.10.2"
clap = "4.4.11"
rand = "0.8.5"
dirs = "5.0.1"
siphasher = { version = "0.3.11", features = ["serde"] }

mlua = { version = "0.9.1", features = ["luajit", "vendored", "macros", "async"], optional = true }

Expand All @@ -64,6 +66,9 @@ env_logger = "0.10"
paste = "^1.0"
rstest = "0.17"
tempfile = "3.0.7"
rand_distr = "0.4.3"

sqlite = "0.32.0"

[workspace]
members = [
Expand Down
58 changes: 21 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ Embedded SQL DBMS
KipSQL is designed to allow small Rust projects to reduce external dependencies and get rid of heavy database maintenance,
so that the Rust application itself can provide SQL storage capabilities.


If you are a developer of the following applications, we very much welcome you to try using KipSQL
and provide your experience and opinions on using it.
- personal website
- desktop/mobile application
- learning database
- platform bot

Welcome to our WebSite, Power By KipSQL: **http://www.kipdata.site/**

### Quick Started
Expand Down Expand Up @@ -84,31 +76,30 @@ Storage Support:
### Features
- ORM Mapping: `features = ["marcos"]`
```rust
#[derive(Debug, Clone, Default)]
pub struct Post {
pub post_title: String,
pub post_date: NaiveDateTime,
pub post_body: String,
#[derive(Default, Debug, PartialEq)]
struct MyStruct {
c1: i32,
c2: String,
}

implement_from_tuple!(Post, (
post_title: String => |post: &mut Post, value: DataValue| {
if let Some(title) = value.utf8() {
post.post_title = title;
}
},
post_date: NaiveDateTime => |post: &mut Post, value: DataValue| {
if let Some(date_time) = value.datetime() {
post.post_date = date_time;
}
},
post_body: String => |post: &mut Post, value: DataValue| {
if let Some(body) = value.utf8() {
post.post_body = body;
implement_from_tuple!(
MyStruct, (
c1: i32 => |inner: &mut MyStruct, value| {
if let DataValue::Int32(Some(val)) = value {
inner.c1 = val;
}
},
c2: String => |inner: &mut MyStruct, value| {
if let DataValue::Utf8(Some(val)) = value {
inner.c2 = val;
}
}
}
));
)
);
```
- Optimizer
- RBO
- CBO based on RBO(Physical Selection)
- Execute
- Volcano
- Codegen on LuaJIT: `features = ["codegen_execute"]`
Expand Down Expand Up @@ -165,6 +156,7 @@ implement_from_tuple!(Post, (
- [x] Insert Overwrite
- [x] Update
- [x] Delete
- [x] Analyze
- DataTypes
- Invalid
- SqlNull
Expand All @@ -182,14 +174,6 @@ implement_from_tuple!(Post, (
- Varchar
- Date
- DateTime
- Optimizer rules
- Limit Project Transpose
- Eliminate Limits
- Push Limit Through Join
- Push Limit Into Scan
- Combine Filters
- Column Pruning
- Collapse Project

## License

Expand Down
129 changes: 93 additions & 36 deletions benchmarks/query_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use criterion::{criterion_group, criterion_main, Criterion};
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use kip_sql::db::{Database, DatabaseError};
use kip_sql::execution::{codegen, volcano};
use kip_sql::execution::volcano;
use kip_sql::storage::kip::KipStorage;
use kip_sql::storage::Storage;
use sqlite::Error;
use std::cell::RefCell;
use std::fs;
use std::path::Path;
use std::sync::Arc;

const QUERY_BENCH_PATH: &'static str = "./query_bench_data";
const QUERY_CASE: &'static str = "select * from t1 where c1 = 1000";
const QUERY_BENCH_KIPSQL_PATH: &'static str = "./kipsql_bench";
const QUERY_BENCH_SQLITE_PATH: &'static str = "./sqlite_bench";
const TABLE_ROW_NUM: u64 = 2_00_000;

async fn init_query_bench() -> Result<(), DatabaseError> {
let database = Database::with_kipdb(QUERY_BENCH_PATH).await.unwrap();
async fn init_kipsql_query_bench() -> Result<(), DatabaseError> {
let database = Database::with_kipdb(QUERY_BENCH_KIPSQL_PATH).await.unwrap();
database
.run("create table t1 (c1 int primary key, c2 int)")
.await?;
Expand All @@ -31,6 +36,29 @@ async fn init_query_bench() -> Result<(), DatabaseError> {
}
pb.finish_with_message("Insert completed!");

let _ = database.run("analyze table t1").await?;

Ok(())
}

fn init_sqlite_query_bench() -> Result<(), Error> {
let connection = sqlite::open(QUERY_BENCH_SQLITE_PATH.to_owned())?;

let _ = connection.execute("create table t1 (c1 int primary key, c2 int)")?;

let pb = ProgressBar::new(TABLE_ROW_NUM);
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}")
.unwrap(),
);

for i in 0..TABLE_ROW_NUM {
let _ = connection.execute(format!("insert into t1 values({}, {})", i, i + 1))?;
pb.set_position(i + 1);
}
pb.finish_with_message("Insert completed!");

Ok(())
}

Expand All @@ -48,56 +76,85 @@ fn query_on_execute(c: &mut Criterion) {
.build()
.unwrap();
let database = rt.block_on(async {
if !path_exists_and_is_directory(QUERY_BENCH_PATH) {
if !Path::new(QUERY_BENCH_SQLITE_PATH).exists() {
println!(
"SQLITE: The table is not initialized and data insertion is started. => {}",
TABLE_ROW_NUM
);

init_sqlite_query_bench().unwrap();
}
if !path_exists_and_is_directory(QUERY_BENCH_KIPSQL_PATH) {
println!(
"The table is not initialized and data insertion is started. => {}",
"KipSQL: The table is not initialized and data insertion is started. => {}",
TABLE_ROW_NUM
);

init_query_bench().await.unwrap();
init_kipsql_query_bench().await.unwrap();
}

Database::<KipStorage>::with_kipdb(QUERY_BENCH_PATH)
Database::<KipStorage>::with_kipdb(QUERY_BENCH_KIPSQL_PATH)
.await
.unwrap()
});

println!("Table initialization completed");

let (codegen_transaction, plan) = rt.block_on(async {
let transaction = database.storage.transaction().await.unwrap();
let (plan, _) =
Database::<KipStorage>::build_plan("select * from t1", &transaction).unwrap();

(Arc::new(transaction), plan)
});
#[cfg(feature = "codegen_execute")]
{
use kip_sql::execution::codegen;

let (codegen_transaction, plan) = rt.block_on(async {
let transaction = database.storage.transaction().await.unwrap();
let (plan, _) = Database::<KipStorage>::build_plan(QUERY_CASE, &transaction).unwrap();

(Arc::new(transaction), plan)
});

c.bench_function(format!("Codegen: {}", QUERY_CASE).as_str(), |b| {
b.to_async(&rt).iter(|| async {
let tuples = codegen::execute(plan.clone(), codegen_transaction.clone())
.await
.unwrap();
if tuples.len() as u64 != TABLE_ROW_NUM {
panic!("{}", tuples.len());
}
})
});

let (volcano_transaction, plan) = rt.block_on(async {
let transaction = database.storage.transaction().await.unwrap();
let (plan, _) = Database::<KipStorage>::build_plan(QUERY_CASE, &transaction).unwrap();

(RefCell::new(transaction), plan)
});

c.bench_function(format!("Volcano: {}", QUERY_CASE).as_str(), |b| {
b.to_async(&rt).iter(|| async {
let mut stream = volcano::build_stream(plan.clone(), &volcano_transaction);
let tuples = volcano::try_collect(&mut stream).await.unwrap();
if tuples.len() as u64 != TABLE_ROW_NUM {
panic!("{}", tuples.len());
}
})
});
}

c.bench_function("Codegen: select all", |b| {
c.bench_function(format!("KipSQL: {}", QUERY_CASE).as_str(), |b| {
b.to_async(&rt).iter(|| async {
let tuples = codegen::execute(plan.clone(), codegen_transaction.clone())
.await
.unwrap();
if tuples.len() as u64 != TABLE_ROW_NUM {
panic!("{}", tuples.len());
}
let _tuples = database.run(QUERY_CASE).await.unwrap();
})
});

let (volcano_transaction, plan) = rt.block_on(async {
let transaction = database.storage.transaction().await.unwrap();
let (plan, _) =
Database::<KipStorage>::build_plan("select * from t1", &transaction).unwrap();

(RefCell::new(transaction), plan)
});

c.bench_function("Volcano: select all", |b| {
let connection = sqlite::open(QUERY_BENCH_SQLITE_PATH.to_owned()).unwrap();
c.bench_function(format!("SQLite: {}", QUERY_CASE).as_str(), |b| {
b.to_async(&rt).iter(|| async {
let mut stream = volcano::build_stream(plan.clone(), &volcano_transaction);
let tuples = volcano::try_collect(&mut stream).await.unwrap();
if tuples.len() as u64 != TABLE_ROW_NUM {
panic!("{}", tuples.len());
}
let _tuples = connection
.prepare(QUERY_CASE)
.unwrap()
.into_iter()
.map(|row| row.unwrap())
.collect_vec();
})
});
}
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly
nightly-2024-01-18
2 changes: 2 additions & 0 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
column,
}),
childrens: vec![plan],
physical_option: None,
}
}
AlterTableOperation::DropColumn {
Expand All @@ -58,6 +59,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
column_name,
}),
childrens: vec![plan],
physical_option: None,
}
}
AlterTableOperation::DropPrimaryKey => todo!(),
Expand Down
42 changes: 42 additions & 0 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::binder::{lower_case_name, split_name, BindError, Binder};
use crate::planner::operator::analyze::AnalyzeOperator;
use crate::planner::operator::scan::ScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use itertools::Itertools;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result<LogicalPlan, BindError> {
let name = lower_case_name(name);
let name = split_name(&name)?;
let table_name = Arc::new(name.to_string());

let table_catalog = self
.context
.table(table_name.clone())
.cloned()
.ok_or_else(|| BindError::InvalidTable(format!("bind table {}", name)))?;
let columns = table_catalog
.all_columns()
.into_iter()
.filter_map(|column| column.desc.is_index().then(|| column))
.collect_vec();

let scan_op = ScanOperator::build(table_name.clone(), &table_catalog);
self.context
.add_bind_table(table_name.clone(), table_catalog, None)?;

let plan = LogicalPlan {
operator: Operator::Analyze(AnalyzeOperator {
table_name,
columns,
}),
childrens: vec![scan_op],
physical_option: None,
};
Ok(plan)
}
}
2 changes: 2 additions & 0 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(LogicalPlan {
operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
childrens: vec![],
physical_option: None,
})
} else {
// COPY <dest_table> FROM <source_file>
Expand All @@ -95,6 +96,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
table: table_name.to_string(),
}),
childrens: vec![],
physical_option: None,
})
}
} else {
Expand Down
Loading

0 comments on commit ed4c7ee

Please sign in to comment.