diff --git a/.gitignore b/.gitignore index 0b578254..5e475b7e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,6 @@ Cargo.lock .DS_Store /hello_world -/transaction \ No newline at end of file +/transaction + +query_bench_data/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index f37743a6..f93fdf2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "kip-sql" -version = "0.0.1-alpha.7" +version = "0.0.1-alpha.8" edition = "2021" authors = ["Kould ", "Xwg "] description = "build the SQL layer of KipDB database" @@ -15,6 +15,17 @@ categories = ["development-tools", "database"] [lib] doctest = false +[features] +default = ["marcos"] +marcos = [] +codegen_execute = ["dep:mlua"] + +[[bench]] +name = "query_bench" +path = "benchmarks/query_benchmark.rs" +harness = false +required-features = ["codegen_execute"] + [dependencies] sqlparser = "0.34.0" thiserror = "1" @@ -35,14 +46,18 @@ ahash = "0.8.3" lazy_static = "1.4.0" comfy-table = "7.0.1" bytes = "1.5.0" -kip_db = "0.1.2-alpha.20" +kip_db = "0.1.2-alpha.21" rust_decimal = "1" csv = "1" regex = "1.10.2" clap = "4.4.11" +mlua = { version = "0.9.1", features = ["luajit", "vendored", "macros", "async"], optional = true } + [dev-dependencies] cargo-tarpaulin = "0.27.1" +criterion = { version = "0.3.5", features = ["async_tokio", "html_reports"] } +indicatif = "0.17" tokio-test = "0.4.2" ctor = "0.2.0" env_logger = "0.10" diff --git a/README.md b/README.md index 439401dd..2d10c0c1 100755 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ Storage Support: - KipDB ### Features -- ORM Mapping +- ORM Mapping: `features = ["marcos"]` ```rust #[derive(Debug, Clone, Default)] pub struct Post { @@ -109,6 +109,9 @@ implement_from_tuple!(Post, ( } )); ``` +- Execute + - Volcano + - Codegen on LuaJIT: `features = ["codegen_execute"]` - MVCC Transaction - Optimistic - SQL field options diff --git a/benchmarks/query_benchmark.rs b/benchmarks/query_benchmark.rs new file mode 100644 index 00000000..903c0a04 --- /dev/null +++ b/benchmarks/query_benchmark.rs @@ -0,0 +1,111 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use indicatif::{ProgressBar, ProgressStyle}; +use kip_sql::db::{Database, DatabaseError}; +use kip_sql::execution::{codegen, volcano}; +use kip_sql::storage::kip::KipStorage; +use kip_sql::storage::Storage; +use std::cell::RefCell; +use std::fs; +use std::sync::Arc; + +const QUERY_BENCH_PATH: &'static str = "./query_bench_data"; +const TABLE_ROW_NUM: u64 = 2_00_000; + +async fn init_query_bench() -> Result<(), DatabaseError> { + let database = Database::with_kipdb(QUERY_BENCH_PATH).await.unwrap(); + database + .run("create table t1 (c1 int primary key, c2 int)") + .await?; + let pb = ProgressBar::new(TABLE_ROW_NUM); + pb.set_style( + ProgressStyle::default_bar() + .template("[{elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}") + .unwrap(), + ); + + for i in 0..TABLE_ROW_NUM { + let _ = database + .run(format!("insert into t1 values({}, {})", i, i + 1).as_str()) + .await?; + pb.set_position(i + 1); + } + pb.finish_with_message("Insert completed!"); + + Ok(()) +} + +fn path_exists_and_is_directory(path: &str) -> bool { + match fs::metadata(path) { + Ok(metadata) => metadata.is_dir(), + Err(_) => false, + } +} + +fn query_on_execute(c: &mut Criterion) { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .enable_all() + .build() + .unwrap(); + let database = rt.block_on(async { + if !path_exists_and_is_directory(QUERY_BENCH_PATH) { + println!( + "The table is not initialized and data insertion is started. => {}", + TABLE_ROW_NUM + ); + + init_query_bench().await.unwrap(); + } + + Database::::with_kipdb(QUERY_BENCH_PATH) + .await + .unwrap() + }); + + println!("Table initialization completed"); + + let (codegen_transaction, plan) = rt.block_on(async { + let transaction = database.storage.transaction().await.unwrap(); + let (plan, _) = + Database::::build_plan("select * from t1", &transaction).unwrap(); + + (Arc::new(transaction), plan) + }); + + c.bench_function("Codegen: select all", |b| { + b.to_async(&rt).iter(|| async { + let tuples = codegen::execute(plan.clone(), codegen_transaction.clone()) + .await + .unwrap(); + if tuples.len() as u64 != TABLE_ROW_NUM { + panic!("{}", tuples.len()); + } + }) + }); + + let (volcano_transaction, plan) = rt.block_on(async { + let transaction = database.storage.transaction().await.unwrap(); + let (plan, _) = + Database::::build_plan("select * from t1", &transaction).unwrap(); + + (RefCell::new(transaction), plan) + }); + + c.bench_function("Volcano: select all", |b| { + b.to_async(&rt).iter(|| async { + let mut stream = volcano::build_stream(plan.clone(), &volcano_transaction); + let tuples = volcano::try_collect(&mut stream).await.unwrap(); + if tuples.len() as u64 != TABLE_ROW_NUM { + panic!("{}", tuples.len()); + } + }) + }); +} + +criterion_group!( + name = query_benches; + config = Criterion::default().sample_size(10); + targets = query_on_execute +); + +criterion_main!(query_benches,); diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 676d5c4e..d78012bd 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -26,6 +26,7 @@ implement_from_tuple!( ) ); +#[cfg(feature = "marcos")] #[tokio::main] async fn main() -> Result<(), DatabaseError> { let database = Database::with_kipdb("./hello_world").await?; diff --git a/src/binder/select.rs b/src/binder/select.rs index 2a8324ca..c77eb435 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -17,7 +17,7 @@ use super::Binder; use crate::binder::BindError; use crate::catalog::{ColumnCatalog, TableCatalog, TableName}; -use crate::execution::executor::dql::join::joins_nullable; +use crate::execution::volcano::dql::join::joins_nullable; use crate::expression::BinaryOperator; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::sort::{SortField, SortOperator}; diff --git a/src/catalog/column.rs b/src/catalog/column.rs index 563f68d2..044a2dab 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -1,3 +1,4 @@ +use crate::catalog::TableName; use crate::expression::ScalarExpression; use serde::{Deserialize, Serialize}; use std::hash::Hash; @@ -20,6 +21,7 @@ pub struct ColumnCatalog { pub struct ColumnSummary { pub id: Option, pub name: String, + pub table_name: Option, } impl ColumnCatalog { @@ -33,6 +35,7 @@ impl ColumnCatalog { summary: ColumnSummary { id: None, name: column_name, + table_name: None, }, nullable, desc: column_desc, @@ -43,8 +46,9 @@ impl ColumnCatalog { pub(crate) fn new_dummy(column_name: String) -> ColumnCatalog { ColumnCatalog { summary: ColumnSummary { - id: Some(0), + id: None, name: column_name, + table_name: None, }, nullable: false, desc: ColumnDesc::new(LogicalType::Varchar(None), false, false, None), diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 2dd7398d..01db1e93 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -58,7 +58,9 @@ impl TableCatalog { let col_id = self.columns.len() as u32; + col.summary.table_name = Some(self.name.clone()); col.summary.id = Some(col_id); + self.column_idxs.insert(col.name().to_string(), col_id); self.columns.insert(col_id, Arc::new(col)); diff --git a/src/db.rs b/src/db.rs index 2a7a1670..1356636e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,10 @@ +use sqlparser::ast::Statement; use sqlparser::parser::ParserError; use std::cell::RefCell; use std::path::PathBuf; use crate::binder::{BindError, Binder, BinderContext}; -use crate::execution::executor::{build, try_collect, BoxedExecutor}; +use crate::execution::volcano::{build_stream, try_collect}; use crate::execution::ExecutorError; use crate::optimizer::heuristic::batch::HepBatchStrategy; use crate::optimizer::heuristic::optimizer::HepOptimizer; @@ -15,8 +16,14 @@ use crate::storage::kip::KipStorage; use crate::storage::{Storage, StorageError, Transaction}; use crate::types::tuple::Tuple; +#[derive(Copy, Clone)] +pub enum QueryExecute { + Volcano, + Codegen, +} + pub struct Database { - pub(crate) storage: S, + pub storage: S, } impl Database { @@ -28,6 +35,43 @@ impl Database { } } +impl Database { + pub async fn run_on_query( + &self, + sql: &str, + query_execute: QueryExecute, + ) -> Result, DatabaseError> { + match query_execute { + QueryExecute::Volcano => self.run(sql).await, + QueryExecute::Codegen => { + #[cfg(feature = "codegen_execute")] + { + use crate::execution::codegen::execute; + use std::sync::Arc; + + let transaction = self.storage.transaction().await?; + let (plan, statement) = Self::build_plan(sql, &transaction)?; + + if matches!(statement, Statement::Query(_)) { + let transaction = Arc::new(transaction); + + let tuples = execute(plan, transaction.clone()).await?; + Arc::into_inner(transaction).unwrap().commit().await?; + + Ok(tuples) + } else { + Self::run_volcano(transaction, plan).await + } + } + #[cfg(not(feature = "codegen_execute"))] + { + unreachable!("open feature: `codegen_execute` plz") + } + } + } + } +} + impl Database { /// Create a new Database instance. pub fn new(storage: S) -> Result { @@ -37,8 +81,17 @@ impl Database { /// Run SQL queries. pub async fn run(&self, sql: &str) -> Result, DatabaseError> { let transaction = self.storage.transaction().await?; + let (plan, _) = Self::build_plan(sql, &transaction)?; + + Self::run_volcano(transaction, plan).await + } + + pub(crate) async fn run_volcano( + transaction: ::TransactionType, + plan: LogicalPlan, + ) -> Result, DatabaseError> { let transaction = RefCell::new(transaction); - let mut stream = Self::_run(sql, &transaction)?; + let mut stream = build_stream(plan, &transaction); let tuples = try_collect(&mut stream).await?; transaction.into_inner().commit().await?; @@ -54,18 +107,16 @@ impl Database { }) } - fn _run( + pub fn build_plan( sql: &str, - transaction: &RefCell<::TransactionType>, - ) -> Result { + transaction: &::TransactionType, + ) -> Result<(LogicalPlan, Statement), DatabaseError> { // parse - let stmts = parse_sql(sql)?; + let mut stmts = parse_sql(sql)?; if stmts.is_empty() { return Err(DatabaseError::EmptyStatement); } - let binder = Binder::new(BinderContext::new(unsafe { - transaction.as_ptr().as_ref().unwrap() - })); + let binder = Binder::new(BinderContext::new(transaction)); /// Build a logical plan. /// /// SELECT a,b FROM t1 ORDER BY a LIMIT 1; @@ -79,10 +130,10 @@ impl Database { let best_plan = Self::default_optimizer(source_plan).find_best()?; // println!("best_plan plan: {:#?}", best_plan); - Ok(build(best_plan, &transaction)) + Ok((best_plan, stmts.remove(0))) } - fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer { + pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer { HepOptimizer::new(source_plan) .batch( "Column Pruning".to_string(), @@ -126,7 +177,9 @@ pub struct DBTransaction { impl DBTransaction { pub async fn run(&mut self, sql: &str) -> Result, DatabaseError> { - let mut stream = Database::::_run(sql, &self.inner)?; + let (plan, _) = + Database::::build_plan(sql, unsafe { self.inner.as_ptr().as_ref().unwrap() })?; + let mut stream = build_stream(plan, &self.inner); Ok(try_collect(&mut stream).await?) } @@ -160,7 +213,7 @@ pub enum DatabaseError { #[from] StorageError, ), - #[error("executor error: {0}")] + #[error("volcano error: {0}")] ExecutorError( #[source] #[from] @@ -179,9 +232,9 @@ pub enum DatabaseError { #[cfg(test)] mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; - use crate::db::{Database, DatabaseError}; + use crate::db::{Database, DatabaseError, QueryExecute}; use crate::storage::{Storage, StorageError, Transaction}; - use crate::types::tuple::create_table; + use crate::types::tuple::{create_table, Tuple}; use crate::types::value::DataValue; use crate::types::LogicalType; use std::sync::Arc; @@ -287,189 +340,328 @@ mod test { #[tokio::test] async fn test_crud_sql() -> Result<(), DatabaseError> { + let mut results_1 = _test_crud_sql(QueryExecute::Volcano).await?; + #[cfg(feature = "codegen_execute")] + { + let mut results_2 = _test_crud_sql(QueryExecute::Codegen).await?; + + assert_eq!(results_1.len(), results_2.len()); + + for i in 0..results_1.len() { + results_1[i].sort_by_key(|tuple: &Tuple| tuple.serialize_to()); + results_2[i].sort_by_key(|tuple: &Tuple| tuple.serialize_to()); + + if results_1[i] != results_2[i] { + panic!( + "Index: {i} Tuples not match! \n Volcano: \n{}\n Codegen: \n{}", + create_table(&results_1[i]), + create_table(&results_2[i]) + ); + } + } + } + + Ok(()) + } + + async fn _test_crud_sql(query_execute: QueryExecute) -> Result>, DatabaseError> { + let mut results = Vec::new(); let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kipsql = Database::with_kipdb(temp_dir.path()).await?; - let _ = kipsql.run("create table t1 (a int primary key, b int unique null, k int, z varchar unique null)").await?; + let _ = kipsql.run_on_query("create table t1 (a int primary key, b int unique null, k int, z varchar unique null)", query_execute).await?; let _ = kipsql - .run("create table t2 (c int primary key, d int unsigned null, e datetime)") + .run_on_query( + "create table t2 (c int primary key, d int unsigned null, e datetime)", + query_execute, + ) .await?; - let _ = kipsql.run("insert into t1 (a, b, k, z) values (-99, 1, 1, 'k'), (-1, 2, 2, 'i'), (5, 3, 2, 'p'), (29, 4, 2, 'db')").await?; - let _ = kipsql.run("insert into t2 (d, c, e) values (2, 1, '2021-05-20 21:00:00'), (3, 4, '2023-09-10 00:00:00')").await?; + let _ = kipsql.run_on_query("insert into t1 (a, b, k, z) values (-99, 1, 1, 'k'), (-1, 2, 2, 'i'), (5, 3, 2, 'p'), (29, 4, 2, 'db')", query_execute).await?; + let _ = kipsql.run_on_query("insert into t2 (d, c, e) values (2, 1, '2021-05-20 21:00:00'), (3, 4, '2023-09-10 00:00:00')", query_execute).await?; let _ = kipsql - .run("create table t3 (a int primary key, b decimal(4,2))") + .run_on_query( + "create table t3 (a int primary key, b decimal(4,2))", + query_execute, + ) .await?; let _ = kipsql - .run("insert into t3 (a, b) values (1, 1111), (2, 2.01), (3, 3.00)") + .run_on_query( + "insert into t3 (a, b) values (1, 1111), (2, 2.01), (3, 3.00)", + query_execute, + ) .await?; let _ = kipsql - .run("insert into t3 (a, b) values (4, 4444), (5, 5222), (6, 1.00)") + .run_on_query( + "insert into t3 (a, b) values (4, 4444), (5, 5222), (6, 1.00)", + query_execute, + ) .await?; println!("show tables:"); - let tuples_show_tables = kipsql.run("show tables").await?; + let tuples_show_tables = kipsql.run_on_query("show tables", query_execute).await?; println!("{}", create_table(&tuples_show_tables)); + results.push(tuples_show_tables); println!("full t1:"); - let tuples_full_fields_t1 = kipsql.run("select * from t1").await?; + let tuples_full_fields_t1 = kipsql + .run_on_query("select * from t1", query_execute) + .await?; println!("{}", create_table(&tuples_full_fields_t1)); + results.push(tuples_full_fields_t1); println!("full t2:"); - let tuples_full_fields_t2 = kipsql.run("select * from t2").await?; + let tuples_full_fields_t2 = kipsql + .run_on_query("select * from t2", query_execute) + .await?; println!("{}", create_table(&tuples_full_fields_t2)); + results.push(tuples_full_fields_t2); println!("projection_and_filter:"); - let tuples_projection_and_filter = kipsql.run("select a from t1 where b > 1").await?; + let tuples_projection_and_filter = kipsql + .run_on_query("select a from t1 where b > 1", query_execute) + .await?; println!("{}", create_table(&tuples_projection_and_filter)); + results.push(tuples_projection_and_filter); println!("projection_and_sort:"); - let tuples_projection_and_sort = kipsql.run("select * from t1 order by a, b").await?; + let tuples_projection_and_sort = kipsql + .run_on_query("select * from t1 order by a, b", query_execute) + .await?; println!("{}", create_table(&tuples_projection_and_sort)); + results.push(tuples_projection_and_sort); println!("like t1 1:"); - let tuples_like_1_t1 = kipsql.run("select * from t1 where z like '%k'").await?; + let tuples_like_1_t1 = kipsql + .run_on_query("select * from t1 where z like '%k'", query_execute) + .await?; println!("{}", create_table(&tuples_like_1_t1)); + results.push(tuples_like_1_t1); println!("like t1 2:"); - let tuples_like_2_t1 = kipsql.run("select * from t1 where z like '_b'").await?; + let tuples_like_2_t1 = kipsql + .run_on_query("select * from t1 where z like '_b'", query_execute) + .await?; println!("{}", create_table(&tuples_like_2_t1)); + results.push(tuples_like_2_t1); println!("not like t1:"); - let tuples_not_like_t1 = kipsql.run("select * from t1 where z not like '_b'").await?; + let tuples_not_like_t1 = kipsql + .run_on_query("select * from t1 where z not like '_b'", query_execute) + .await?; println!("{}", create_table(&tuples_not_like_t1)); + results.push(tuples_not_like_t1); println!("in t1:"); - let tuples_in_t1 = kipsql.run("select * from t1 where a in (5, 29)").await?; + let tuples_in_t1 = kipsql + .run_on_query("select * from t1 where a in (5, 29)", query_execute) + .await?; println!("{}", create_table(&tuples_in_t1)); + results.push(tuples_in_t1); println!("not in t1:"); let tuples_not_in_t1 = kipsql - .run("select * from t1 where a not in (5, 29)") + .run_on_query("select * from t1 where a not in (5, 29)", query_execute) .await?; println!("{}", create_table(&tuples_not_in_t1)); + results.push(tuples_not_in_t1); println!("limit:"); - let tuples_limit = kipsql.run("select * from t1 limit 1 offset 1").await?; + let tuples_limit = kipsql + .run_on_query("select * from t1 limit 1 offset 1", query_execute) + .await?; println!("{}", create_table(&tuples_limit)); + results.push(tuples_limit); println!("inner join:"); let tuples_inner_join = kipsql - .run("select * from t1 inner join t2 on a = c") + .run_on_query("select * from t1 inner join t2 on a = c", query_execute) .await?; println!("{}", create_table(&tuples_inner_join)); + results.push(tuples_inner_join); println!("left join:"); - let tuples_left_join = kipsql.run("select * from t1 left join t2 on a = c").await?; + let tuples_left_join = kipsql + .run_on_query("select * from t1 left join t2 on a = c", query_execute) + .await?; println!("{}", create_table(&tuples_left_join)); + results.push(tuples_left_join); println!("right join:"); let tuples_right_join = kipsql - .run("select * from t1 right join t2 on a = c") + .run_on_query("select * from t1 right join t2 on a = c", query_execute) .await?; println!("{}", create_table(&tuples_right_join)); + results.push(tuples_right_join); println!("full join:"); - let tuples_full_join = kipsql.run("select * from t1 full join t2 on a = c").await?; + let tuples_full_join = kipsql + .run_on_query("select * from t1 full join t2 on a = c", query_execute) + .await?; println!("{}", create_table(&tuples_full_join)); + results.push(tuples_full_join); println!("count agg:"); - let tuples_count_agg = kipsql.run("select count(d) from t2").await?; + let tuples_count_agg = kipsql + .run_on_query("select count(d) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_count_agg)); + results.push(tuples_count_agg); println!("count wildcard agg:"); - let tuples_count_wildcard_agg = kipsql.run("select count(*) from t2").await?; + let tuples_count_wildcard_agg = kipsql + .run_on_query("select count(*) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_count_wildcard_agg)); + results.push(tuples_count_wildcard_agg); println!("count distinct agg:"); - let tuples_count_distinct_agg = kipsql.run("select count(distinct d) from t2").await?; + let tuples_count_distinct_agg = kipsql + .run_on_query("select count(distinct d) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_count_distinct_agg)); + results.push(tuples_count_distinct_agg); println!("sum agg:"); - let tuples_sum_agg = kipsql.run("select sum(d) from t2").await?; + let tuples_sum_agg = kipsql + .run_on_query("select sum(d) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_sum_agg)); + results.push(tuples_sum_agg); println!("sum distinct agg:"); - let tuples_sum_distinct_agg = kipsql.run("select sum(distinct d) from t2").await?; + let tuples_sum_distinct_agg = kipsql + .run_on_query("select sum(distinct d) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_sum_distinct_agg)); + results.push(tuples_sum_distinct_agg); println!("avg agg:"); - let tuples_avg_agg = kipsql.run("select avg(d) from t2").await?; + let tuples_avg_agg = kipsql + .run_on_query("select avg(d) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_avg_agg)); + results.push(tuples_avg_agg); println!("min_max agg:"); - let tuples_min_max_agg = kipsql.run("select min(d), max(d) from t2").await?; + let tuples_min_max_agg = kipsql + .run_on_query("select min(d), max(d) from t2", query_execute) + .await?; println!("{}", create_table(&tuples_min_max_agg)); + results.push(tuples_min_max_agg); println!("group agg:"); let tuples_group_agg = kipsql - .run("select c, max(d) from t2 group by c having c = 1") + .run_on_query( + "select c, max(d) from t2 group by c having c = 1", + query_execute, + ) .await?; println!("{}", create_table(&tuples_group_agg)); println!("alias:"); - let tuples_group_agg = kipsql.run("select c as o from t2").await?; + let tuples_group_agg = kipsql + .run_on_query("select c as o from t2", query_execute) + .await?; println!("{}", create_table(&tuples_group_agg)); + results.push(tuples_group_agg); println!("alias agg:"); let tuples_group_agg = kipsql - .run("select c, max(d) as max_d from t2 group by c having c = 1") + .run_on_query( + "select c, max(d) as max_d from t2 group by c having c = 1", + query_execute, + ) .await?; println!("{}", create_table(&tuples_group_agg)); + results.push(tuples_group_agg); println!("time max:"); - let tuples_time_max = kipsql.run("select max(e) as max_time from t2").await?; + let tuples_time_max = kipsql + .run_on_query("select max(e) as max_time from t2", query_execute) + .await?; println!("{}", create_table(&tuples_time_max)); + results.push(tuples_time_max); println!("time where:"); let tuples_time_where_t2 = kipsql - .run("select (c + 1) from t2 where e > '2021-05-20'") + .run_on_query( + "select (c + 1) from t2 where e > '2021-05-20'", + query_execute, + ) .await?; println!("{}", create_table(&tuples_time_where_t2)); + results.push(tuples_time_where_t2); assert!(kipsql - .run("select max(d) from t2 group by c") + .run_on_query("select max(d) from t2 group by c", query_execute) .await .is_err()); println!("distinct t1:"); - let tuples_distinct_t1 = kipsql.run("select distinct b, k from t1").await?; + let tuples_distinct_t1 = kipsql + .run_on_query("select distinct b, k from t1", query_execute) + .await?; println!("{}", create_table(&tuples_distinct_t1)); + results.push(tuples_distinct_t1); println!("update t1 with filter:"); - let _ = kipsql.run("update t1 set b = 0 where b = 1").await?; + let _ = kipsql + .run_on_query("update t1 set b = 0 where b = 1", query_execute) + .await?; println!("after t1:"); - let update_after_full_t1 = kipsql.run("select * from t1").await?; + + let update_after_full_t1 = kipsql + .run_on_query("select * from t1", query_execute) + .await?; println!("{}", create_table(&update_after_full_t1)); + results.push(update_after_full_t1); println!("insert overwrite t1:"); let _ = kipsql - .run("insert overwrite t1 (a, b, k) values (-99, 1, 0)") + .run_on_query( + "insert overwrite t1 (a, b, k) values (-99, 1, 0)", + query_execute, + ) .await?; println!("after t1:"); - let insert_overwrite_after_full_t1 = kipsql.run("select * from t1").await?; + let insert_overwrite_after_full_t1 = kipsql + .run_on_query("select * from t1", query_execute) + .await?; println!("{}", create_table(&insert_overwrite_after_full_t1)); + results.push(insert_overwrite_after_full_t1); assert!(kipsql - .run("insert overwrite t1 (a, b, k) values (-1, 1, 0)") + .run_on_query( + "insert overwrite t1 (a, b, k) values (-1, 1, 0)", + query_execute + ) .await .is_err()); println!("delete t1 with filter:"); - let _ = kipsql.run("delete from t1 where b = 0").await?; + let _ = kipsql + .run_on_query("delete from t1 where b = 0", query_execute) + .await?; println!("after t1:"); - let delete_after_full_t1 = kipsql.run("select * from t1").await?; + let delete_after_full_t1 = kipsql + .run_on_query("select * from t1", query_execute) + .await?; println!("{}", create_table(&delete_after_full_t1)); + results.push(delete_after_full_t1); - println!("truncate t1:"); - let _ = kipsql.run("truncate t1").await?; + println!("trun_on_querycate t1:"); + let _ = kipsql.run_on_query("truncate t1", query_execute).await?; println!("drop t1:"); - let _ = kipsql.run("drop table t1").await?; + let _ = kipsql.run_on_query("drop table t1", query_execute).await?; println!("decimal:"); - let tuples_decimal = kipsql.run("select * from t3").await?; + let tuples_decimal = kipsql + .run_on_query("select * from t3", query_execute) + .await?; println!("{}", create_table(&tuples_decimal)); + results.push(tuples_decimal); - Ok(()) + Ok(results) } } diff --git a/src/execution/codegen/dql/aggregate/hash_agg.rs b/src/execution/codegen/dql/aggregate/hash_agg.rs new file mode 100644 index 00000000..53bb4b31 --- /dev/null +++ b/src/execution/codegen/dql/aggregate/hash_agg.rs @@ -0,0 +1,99 @@ +use crate::execution::codegen::CodeGenerator; +use crate::execution::volcano::dql::aggregate::hash_agg::HashAggStatus; +use crate::execution::ExecutorError; +use crate::expression::ScalarExpression; +use crate::impl_from_lua; +use crate::planner::operator::aggregate::AggregateOperator; +use crate::types::tuple::Tuple; +use mlua::prelude::{LuaResult, LuaValue}; +use mlua::{FromLua, Lua, UserData, UserDataMethods, Value}; +use std::mem; + +pub struct HashAgg { + id: i64, + agg_calls: Option>, + groupby_exprs: Option>, + is_produced: bool, +} + +impl From<(AggregateOperator, i64)> for HashAgg { + fn from( + ( + AggregateOperator { + agg_calls, + groupby_exprs, + .. + }, + id, + ): (AggregateOperator, i64), + ) -> Self { + HashAgg { + id, + agg_calls: Some(agg_calls), + groupby_exprs: Some(groupby_exprs), + is_produced: false, + } + } +} + +impl UserData for HashAggStatus { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_method_mut("update", |_, agg_status, tuple: Tuple| { + agg_status.update(tuple).unwrap(); + + Ok(()) + }); + methods.add_method_mut("to_tuples", |_, agg_status, ()| { + Ok(agg_status.to_tuples().unwrap()) + }); + } +} + +impl_from_lua!(HashAggStatus); + +impl CodeGenerator for HashAgg { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let (Some(agg_calls), Some(groupby_exprs)) = + (self.agg_calls.take(), self.groupby_exprs.take()) + { + let env = format!("hash_agg_{}", self.id); + lua.globals() + .set(env.as_str(), HashAggStatus::new(agg_calls, groupby_exprs))?; + + script.push_str( + format!( + r#" + for _, tuple in ipairs(results) do + {}:update(tuple) + end + + results = {{}} + + for index, tuple in ipairs({}:to_tuples()) do + index = index - 1 + "#, + env, env + ) + .as_str(), + ); + + self.is_produced = true; + } + + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if mem::replace(&mut self.is_produced, false) { + script.push_str( + r#" + table.insert(results, tuple) + ::continue:: + end + "#, + ); + } + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/aggregate/mod.rs b/src/execution/codegen/dql/aggregate/mod.rs new file mode 100644 index 00000000..ed3f0c84 --- /dev/null +++ b/src/execution/codegen/dql/aggregate/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod hash_agg; +pub(crate) mod simple_agg; diff --git a/src/execution/codegen/dql/aggregate/simple_agg.rs b/src/execution/codegen/dql/aggregate/simple_agg.rs new file mode 100644 index 00000000..05bc074e --- /dev/null +++ b/src/execution/codegen/dql/aggregate/simple_agg.rs @@ -0,0 +1,140 @@ +use crate::catalog::ColumnRef; +use crate::execution::codegen::CodeGenerator; +use crate::execution::volcano::dql::aggregate::{create_accumulators, Accumulator}; +use crate::execution::ExecutorError; +use crate::expression::ScalarExpression; +use crate::impl_from_lua; +use crate::planner::operator::aggregate::AggregateOperator; +use crate::types::tuple::Tuple; +use crate::types::value::ValueRef; +use itertools::Itertools; +use mlua::prelude::{LuaResult, LuaValue}; +use mlua::{FromLua, Lua, UserData, UserDataMethods, Value}; +use std::mem; + +pub struct SimpleAgg { + id: i64, + agg_calls: Option>, + is_produced: bool, +} + +impl From<(AggregateOperator, i64)> for SimpleAgg { + fn from((AggregateOperator { agg_calls, .. }, id): (AggregateOperator, i64)) -> Self { + SimpleAgg { + id, + agg_calls: Some(agg_calls), + is_produced: false, + } + } +} + +pub(crate) struct AggAccumulators { + agg_calls: Vec, + accs: Vec>, + columns: Vec, +} + +impl AggAccumulators { + pub(crate) fn new(agg_calls: Vec) -> Self { + let accs = create_accumulators(&agg_calls); + let columns = agg_calls + .iter() + .map(|expr| expr.output_column()) + .collect_vec(); + + AggAccumulators { + agg_calls, + accs, + columns, + } + } +} + +impl UserData for AggAccumulators { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_method_mut("update", |_, agg_accumulators, tuple: Tuple| { + if agg_accumulators.accs.is_empty() { + return Ok(()); + } + + let values: Vec = agg_accumulators + .agg_calls + .iter() + .map(|expr| match expr { + ScalarExpression::AggCall { args, .. } => args[0].eval(&tuple), + _ => unreachable!(), + }) + .try_collect() + .unwrap(); + + for (acc, value) in agg_accumulators.accs.iter_mut().zip_eq(values.iter()) { + acc.update_value(value).unwrap(); + } + + Ok(()) + }); + methods.add_method_mut("to_tuple", |_, agg_accumulators, ()| { + let columns = mem::replace(&mut agg_accumulators.columns, vec![]); + let values: Vec = agg_accumulators + .accs + .drain(..) + .into_iter() + .map(|acc| acc.evaluate()) + .try_collect() + .unwrap(); + + Ok(Tuple { + id: None, + columns, + values, + }) + }); + } +} + +impl_from_lua!(AggAccumulators); + +impl CodeGenerator for SimpleAgg { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(agg_calls) = self.agg_calls.take() { + let env = format!("simple_agg_{}", self.id); + lua.globals() + .set(env.as_str(), AggAccumulators::new(agg_calls))?; + + script.push_str( + format!( + r#" + for _, tuple in ipairs(results) do + {}:update(tuple) + end + + results = {{}} + + for index, tuple in ipairs({{{}:to_tuple()}}) do + index = index - 1 + "#, + env, env + ) + .as_str(), + ); + + self.is_produced = true; + } + + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if mem::replace(&mut self.is_produced, false) { + script.push_str( + r#" + table.insert(results, tuple) + ::continue:: + end + "#, + ); + } + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/filter.rs b/src/execution/codegen/dql/filter.rs new file mode 100644 index 00000000..e8bcc0ef --- /dev/null +++ b/src/execution/codegen/dql/filter.rs @@ -0,0 +1,47 @@ +use crate::execution::codegen::CodeGenerator; +use crate::execution::ExecutorError; +use crate::expression::ScalarExpression; +use crate::planner::operator::filter::FilterOperator; +use mlua::Lua; + +pub struct Filter { + id: i64, + predicate: Option, +} + +impl From<(FilterOperator, i64)> for Filter { + fn from((FilterOperator { predicate, .. }, id): (FilterOperator, i64)) -> Self { + Filter { + id, + predicate: Some(predicate), + } + } +} + +impl CodeGenerator for Filter { + fn produce(&mut self, _: &Lua, _: &mut String) -> Result<(), ExecutorError> { + Ok(()) + } + + fn consume(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(predicate) = self.predicate.take() { + let env = format!("predicate_{}", self.id); + lua.globals().set(env.as_str(), predicate)?; + + script.push_str( + format!( + r#" + if {}:is_filtering(tuple) then + index = index - 1 + goto continue + end + "#, + env + ) + .as_str(), + ) + } + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/index_scan.rs b/src/execution/codegen/dql/index_scan.rs new file mode 100644 index 00000000..f6d9fc88 --- /dev/null +++ b/src/execution/codegen/dql/index_scan.rs @@ -0,0 +1,104 @@ +use crate::execution::codegen::{CodeGenerator, KipTransactionPtr}; +use crate::execution::ExecutorError; +use crate::impl_from_lua; +use crate::planner::operator::scan::ScanOperator; +use crate::storage::{Iter, Transaction}; +use crate::types::tuple::Tuple; +use mlua::prelude::{LuaResult, LuaValue}; +use mlua::{FromLua, Lua, UserData, UserDataMethods, Value}; +use std::sync::Arc; +use tokio::sync::mpsc; + +const DEFAULT_CHANNEL_BUF: usize = 5; + +pub(crate) struct IndexScan { + id: i64, + op: Option, +} + +impl From<(ScanOperator, i64)> for IndexScan { + fn from((op, id): (ScanOperator, i64)) -> Self { + IndexScan { id, op: Some(op) } + } +} + +pub(crate) struct KipChannelIndexNext(mpsc::Receiver); + +impl KipChannelIndexNext { + pub(crate) fn new(transaction: &KipTransactionPtr, op: ScanOperator) -> Self { + let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_BUF); + + let ScanOperator { + table_name, + columns, + limit, + index_by, + .. + } = op; + let inner = KipTransactionPtr(Arc::clone(&transaction.0)); + + if let Some((index_meta, binaries)) = index_by { + tokio::spawn(async move { + let mut iter = inner + .0 + .read_by_index(table_name, limit, columns, index_meta, binaries) + .unwrap(); + + while let Some(tuple) = iter.next_tuple().unwrap() { + if tx.send(tuple).await.is_err() { + break; + } + } + }); + } else { + unreachable!("`index_by` cannot be None") + } + + KipChannelIndexNext(rx) + } +} + +impl UserData for KipChannelIndexNext { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method_mut("next", |_, next, ()| async move { Ok(next.0.recv().await) }); + } +} + +impl_from_lua!(KipChannelIndexNext); + +impl CodeGenerator for IndexScan { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(op) = self.op.take() { + let env = format!("scan_op_{}", self.id); + lua.globals().set(env.as_str(), op)?; + + script.push_str( + format!( + r#" + local index_scan_{} = transaction:new_index_scan({}) + local index = -1 + + for tuple in function() return index_scan_{}:next() end do + index = index + 1 + "#, + self.id, env, self.id + ) + .as_str(), + ) + } + + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + script.push_str( + r#" + table.insert(results, tuple) + ::continue:: + end + "#, + ); + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/join/hash_join.rs b/src/execution/codegen/dql/join/hash_join.rs new file mode 100644 index 00000000..38a2eccc --- /dev/null +++ b/src/execution/codegen/dql/join/hash_join.rs @@ -0,0 +1,87 @@ +use crate::execution::codegen::CodeGenerator; +use crate::execution::volcano::dql::join::hash_join::HashJoinStatus; +use crate::execution::ExecutorError; +use crate::impl_from_lua; +use crate::planner::operator::join::JoinOperator; +use crate::types::tuple::Tuple; +use mlua::prelude::{LuaResult, LuaValue}; +use mlua::{FromLua, Lua, UserData, UserDataMethods, Value}; +use std::mem; + +pub struct HashJoin { + pub(crate) id: i64, + join_status: Option, + is_produced: bool, + + env: String, +} + +impl From<(JoinOperator, i64, String)> for HashJoin { + fn from((JoinOperator { on, join_type }, id, env): (JoinOperator, i64, String)) -> Self { + HashJoin { + id, + join_status: Some(HashJoinStatus::new(on, join_type)), + is_produced: false, + env, + } + } +} + +impl UserData for HashJoinStatus { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_method_mut("left_build", |_, join_status, tuple: Tuple| { + join_status.left_build(tuple).unwrap(); + + Ok(()) + }); + methods.add_method_mut("right_probe", |_, join_status, tuple: Tuple| { + Ok(join_status.right_probe(tuple).unwrap()) + }); + methods.add_method_mut("drop_build", |_, join_status, ()| { + Ok(join_status.build_drop()) + }); + } +} + +impl_from_lua!(HashJoinStatus); + +impl CodeGenerator for HashJoin { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(join_status) = self.join_status.take() { + lua.globals().set(self.env.as_str(), join_status)?; + + script.push_str( + format!( + r#" + for _, tuple in ipairs({}:drop_build()) do + table.insert(join_temp_{}, tuple) + end + + for index, tuple in ipairs(join_temp_{}) do + index = index - 1 + "#, + self.env, self.id, self.id + ) + .as_str(), + ); + + self.is_produced = true; + } + + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if mem::replace(&mut self.is_produced, false) { + script.push_str( + r#" + table.insert(results, tuple) + ::continue:: + end + "#, + ); + } + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/join/mod.rs b/src/execution/codegen/dql/join/mod.rs new file mode 100644 index 00000000..7a8613ff --- /dev/null +++ b/src/execution/codegen/dql/join/mod.rs @@ -0,0 +1 @@ +pub(crate) mod hash_join; diff --git a/src/execution/codegen/dql/limit.rs b/src/execution/codegen/dql/limit.rs new file mode 100644 index 00000000..d99f0df3 --- /dev/null +++ b/src/execution/codegen/dql/limit.rs @@ -0,0 +1,54 @@ +use crate::execution::codegen::CodeGenerator; +use crate::execution::ExecutorError; +use crate::planner::operator::limit::LimitOperator; +use mlua::Lua; + +pub struct Limit { + _id: i64, + offset: Option, + limit: Option, +} + +impl From<(LimitOperator, i64)> for Limit { + fn from((LimitOperator { offset, limit }, id): (LimitOperator, i64)) -> Self { + Limit { + offset, + limit, + _id: id, + } + } +} + +impl CodeGenerator for Limit { + fn produce(&mut self, _: &Lua, _: &mut String) -> Result<(), ExecutorError> { + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + let Limit { offset, limit, .. } = self; + + if limit.is_some() && limit.unwrap() == 0 { + return Ok(()); + } + + let offset_val = offset.unwrap_or(0); + let offset_limit = offset_val + limit.unwrap_or(1) - 1; + + script.push_str( + format!( + r#" + if index < {} then + goto continue + end + if index > {} then + break + end + "#, + offset_val, offset_limit + ) + .as_str(), + ); + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/mod.rs b/src/execution/codegen/dql/mod.rs new file mode 100644 index 00000000..648faa7c --- /dev/null +++ b/src/execution/codegen/dql/mod.rs @@ -0,0 +1,8 @@ +pub(crate) mod aggregate; +pub(crate) mod filter; +pub(crate) mod index_scan; +pub(crate) mod join; +pub(crate) mod limit; +pub(crate) mod projection; +pub(crate) mod seq_scan; +pub(crate) mod sort; diff --git a/src/execution/codegen/dql/projection.rs b/src/execution/codegen/dql/projection.rs new file mode 100644 index 00000000..0829bcf5 --- /dev/null +++ b/src/execution/codegen/dql/projection.rs @@ -0,0 +1,44 @@ +use crate::execution::codegen::CodeGenerator; +use crate::execution::ExecutorError; +use crate::expression::ScalarExpression; +use crate::planner::operator::project::ProjectOperator; +use mlua::Lua; + +pub struct Projection { + id: i64, + exprs: Option>, +} + +impl From<(ProjectOperator, i64)> for Projection { + fn from((ProjectOperator { exprs }, id): (ProjectOperator, i64)) -> Self { + Projection { + id, + exprs: Some(exprs), + } + } +} + +impl CodeGenerator for Projection { + fn produce(&mut self, _: &Lua, _: &mut String) -> Result<(), ExecutorError> { + Ok(()) + } + + fn consume(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(exprs) = self.exprs.take() { + let env = format!("project_exprs_{}", self.id); + lua.globals().set(env.as_str(), exprs)?; + + script.push_str( + format!( + r#" + tuple:projection({}) + "#, + env + ) + .as_str(), + ) + } + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/seq_scan.rs b/src/execution/codegen/dql/seq_scan.rs new file mode 100644 index 00000000..6976d211 --- /dev/null +++ b/src/execution/codegen/dql/seq_scan.rs @@ -0,0 +1,99 @@ +use crate::execution::codegen::{CodeGenerator, KipTransactionPtr}; +use crate::execution::ExecutorError; +use crate::impl_from_lua; +use crate::planner::operator::scan::ScanOperator; +use crate::storage::{Iter, Transaction}; +use crate::types::tuple::Tuple; +use mlua::prelude::{LuaResult, LuaValue}; +use mlua::{FromLua, Lua, UserData, UserDataMethods, Value}; +use std::sync::Arc; +use tokio::sync::mpsc; + +const DEFAULT_CHANNEL_BUF: usize = 5; + +pub(crate) struct SeqScan { + id: i64, + op: Option, +} + +impl From<(ScanOperator, i64)> for SeqScan { + fn from((op, id): (ScanOperator, i64)) -> Self { + SeqScan { id, op: Some(op) } + } +} + +pub(crate) struct KipChannelSeqNext(mpsc::Receiver); + +impl KipChannelSeqNext { + pub(crate) fn new(transaction: &KipTransactionPtr, op: ScanOperator) -> Self { + let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_BUF); + + let ScanOperator { + table_name, + columns, + limit, + .. + } = op; + let inner = KipTransactionPtr(Arc::clone(&transaction.0)); + + tokio::spawn(async move { + let mut iter = inner.0.read(table_name, limit, columns).unwrap(); + + while let Some(tuple) = iter.next_tuple().unwrap() { + if tx.send(tuple).await.is_err() { + break; + } + } + }); + + KipChannelSeqNext(rx) + } +} + +impl UserData for KipChannelSeqNext { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method_mut("next", |_, next, ()| async move { Ok(next.0.recv().await) }); + } +} + +impl UserData for ScanOperator {} + +impl_from_lua!(KipChannelSeqNext); +impl_from_lua!(ScanOperator); + +impl CodeGenerator for SeqScan { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(op) = self.op.take() { + let env = format!("scan_op_{}", self.id); + lua.globals().set(env.as_str(), op)?; + + script.push_str( + format!( + r#" + local seq_scan_{} = transaction:new_seq_scan({}) + local index = -1 + + for tuple in function() return seq_scan_{}:next() end do + index = index + 1 + "#, + self.id, env, self.id + ) + .as_str(), + ) + } + + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + script.push_str( + r#" + table.insert(results, tuple) + ::continue:: + end + "#, + ); + + Ok(()) + } +} diff --git a/src/execution/codegen/dql/sort.rs b/src/execution/codegen/dql/sort.rs new file mode 100644 index 00000000..032d7669 --- /dev/null +++ b/src/execution/codegen/dql/sort.rs @@ -0,0 +1,68 @@ +use crate::execution::codegen::CodeGenerator; +use crate::execution::ExecutorError; +use crate::impl_from_lua; +use crate::planner::operator::sort::{SortField, SortOperator}; +use mlua::prelude::{LuaResult, LuaValue}; +use mlua::{FromLua, Lua, UserData, Value}; +use std::mem; + +pub struct Sort { + id: i64, + sort_fields: Option>, + is_produced: bool, +} + +impl UserData for SortField {} + +impl_from_lua!(SortField); + +impl From<(SortOperator, i64)> for Sort { + fn from((SortOperator { sort_fields, .. }, id): (SortOperator, i64)) -> Self { + Sort { + id, + sort_fields: Some(sort_fields), + is_produced: false, + } + } +} + +impl CodeGenerator for Sort { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if let Some(sort_fields) = self.sort_fields.take() { + let env = format!("sort_fields_{}", self.id); + lua.globals().set(env.as_str(), sort_fields)?; + + script.push_str( + format!( + r#" + local sort_temp_{} = sort({}, results) + results = {{}} + + for index, tuple in ipairs(sort_temp_{}) do + index = index - 1 + "#, + self.id, env, self.id + ) + .as_str(), + ); + + self.is_produced = true; + } + + Ok(()) + } + + fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> { + if mem::replace(&mut self.is_produced, false) { + script.push_str( + r#" + table.insert(results, tuple) + ::continue:: + end + "#, + ); + } + + Ok(()) + } +} diff --git a/src/execution/codegen/marcos.rs b/src/execution/codegen/marcos.rs new file mode 100644 index 00000000..cfa636af --- /dev/null +++ b/src/execution/codegen/marcos.rs @@ -0,0 +1,13 @@ +#[macro_export] +macro_rules! impl_from_lua { + ($ty:ty) => { + impl<'lua> FromLua<'lua> for $ty { + fn from_lua(value: LuaValue<'lua>, _: &'lua Lua) -> LuaResult { + match value { + Value::UserData(ud) => Ok(ud.take::()?), + _ => unreachable!(), + } + } + } + }; +} diff --git a/src/execution/codegen/mod.rs b/src/execution/codegen/mod.rs new file mode 100644 index 00000000..b8911f4f --- /dev/null +++ b/src/execution/codegen/mod.rs @@ -0,0 +1,354 @@ +mod dql; + +#[macro_use] +pub(crate) mod marcos; + +use crate::execution::codegen::dql::aggregate::hash_agg::HashAgg; +use crate::execution::codegen::dql::aggregate::simple_agg::SimpleAgg; +use crate::execution::codegen::dql::filter::Filter; +use crate::execution::codegen::dql::index_scan::{IndexScan, KipChannelIndexNext}; +use crate::execution::codegen::dql::join::hash_join::HashJoin; +use crate::execution::codegen::dql::limit::Limit; +use crate::execution::codegen::dql::projection::Projection; +use crate::execution::codegen::dql::seq_scan::{KipChannelSeqNext, SeqScan}; +use crate::execution::codegen::dql::sort::Sort; +use crate::execution::volcano::dql::sort::sort; +use crate::execution::ExecutorError; +use crate::expression::ScalarExpression; +use crate::planner::operator::scan::ScanOperator; +use crate::planner::operator::sort::SortField; +use crate::planner::operator::Operator; +use crate::planner::LogicalPlan; +use crate::storage::kip::KipTransaction; +use crate::types::tuple::Tuple; +use crate::types::value::DataValue; +use mlua::prelude::*; +use mlua::{UserData, UserDataMethods, UserDataRef, Value}; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; + +pub trait CodeGenerator { + fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError>; + + fn consume(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError>; +} + +impl UserData for Tuple { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method_mut( + "projection", + |_, tuple, exprs: Vec>| async move { + let mut columns = Vec::with_capacity(exprs.len()); + let mut values = Vec::with_capacity(exprs.len()); + + for expr in exprs.iter() { + values.push(expr.eval(&tuple).unwrap()); + columns.push(expr.output_column()); + } + + tuple.columns = columns; + tuple.values = values; + + Ok(()) + }, + ); + } +} + +impl UserData for ScalarExpression { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method("eval", |_, expr, tuple: UserDataRef| async move { + Ok(ValuePtr(expr.eval(&tuple).unwrap())) + }); + methods.add_async_method( + "is_filtering", + |_, expr, tuple: UserDataRef| async move { + Ok(!matches!( + expr.eval(&tuple).unwrap().as_ref(), + DataValue::Boolean(Some(true)) + )) + }, + ); + } +} + +impl UserData for ValuePtr {} + +#[derive(Debug)] +pub(crate) struct ValuePtr(Arc); + +pub(crate) struct KipTransactionPtr(Arc); + +impl UserData for KipTransactionPtr { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method( + "new_seq_scan", + |_, transaction, op: ScanOperator| async move { + Ok(KipChannelSeqNext::new(transaction, op)) + }, + ); + methods.add_async_method( + "new_index_scan", + |_, transaction, op: ScanOperator| async move { + Ok(KipChannelIndexNext::new(transaction, op)) + }, + ); + } +} + +impl_from_lua!(Tuple); +impl_from_lua!(ScalarExpression); +impl_from_lua!(ValuePtr); +impl_from_lua!(KipTransactionPtr); + +pub async fn execute( + plan: LogicalPlan, + transaction: Arc, +) -> Result, ExecutorError> { + let lua = Lua::new(); + let mut script = String::new(); + + let func_sort = + lua.create_function(|_, (sort_fields, tuples): (Vec, Vec)| { + Ok(sort(&sort_fields, tuples).unwrap()) + })?; + + lua.globals() + .set("transaction", KipTransactionPtr(transaction))?; + lua.globals().set("sort", func_sort)?; + + script.push_str( + r#" + local results = {} + "#, + ); + build_script(0, plan, &lua, &mut script, Box::new(|_, _| Ok(())))?; + script.push_str( + r#" + return results"#, + ); + // println!("Lua Script: \n{}", script); + + Ok(lua.load(script).eval_async().await?) +} + +macro_rules! consumption { + ($child_op_id:expr,$executor:expr, $childrens:expr, $lua:expr, $script:expr, $consume:expr) => { + build_script( + $child_op_id, + $childrens.remove(0), + $lua, + $script, + Box::new(move |lua, script| { + $executor.consume(lua, script)?; + $consume(lua, script)?; + + Ok(()) + }), + )?; + }; +} + +macro_rules! materialize { + ($child_op_id:expr, $executor:expr, $childrens:expr, $lua:expr, $script:expr, $consume:expr) => { + build_script( + $child_op_id, + $childrens.remove(0), + $lua, + $script, + Box::new(move |_, _| Ok(())), + )?; + + $executor.produce($lua, $script)?; + $consume($lua, $script)?; + $executor.consume($lua, $script)?; + }; +} + +static OP_COUNTER: AtomicI64 = AtomicI64::new(0); + +pub fn build_script( + op_id: i64, + plan: LogicalPlan, + lua: &Lua, + script: &mut String, + consume: Box Result<(), ExecutorError>>, +) -> Result<(), ExecutorError> { + let LogicalPlan { + operator, + mut childrens, + } = plan; + + let func_op_id = || OP_COUNTER.fetch_add(1, Ordering::SeqCst); + + match operator { + Operator::Scan(op) => { + if op.index_by.is_some() { + let mut index = IndexScan::from((op, op_id)); + + index.produce(lua, script)?; + consume(lua, script)?; + index.consume(lua, script)?; + } else { + let mut seq_scan = SeqScan::from((op, op_id)); + + seq_scan.produce(lua, script)?; + consume(lua, script)?; + seq_scan.consume(lua, script)?; + } + } + Operator::Project(op) => { + let mut projection = Projection::from((op, op_id)); + + projection.produce(lua, script)?; + consumption!(func_op_id(), projection, childrens, lua, script, consume); + } + Operator::Filter(op) => { + let mut filter = Filter::from((op, op_id)); + + filter.produce(lua, script)?; + consumption!(func_op_id(), filter, childrens, lua, script, consume); + } + Operator::Limit(op) => { + let mut limit = Limit::from((op, op_id)); + + limit.produce(lua, script)?; + consumption!(func_op_id(), limit, childrens, lua, script, consume); + } + Operator::Aggregate(op) => { + if op.groupby_exprs.is_empty() { + let mut simple_agg = SimpleAgg::from((op, op_id)); + + materialize!(func_op_id(), simple_agg, childrens, lua, script, consume); + } else { + let mut hash_agg = HashAgg::from((op, op_id)); + + materialize!(func_op_id(), hash_agg, childrens, lua, script, consume); + } + } + Operator::Sort(op) => { + let mut sort = Sort::from((op, op_id)); + + materialize!(func_op_id(), sort, childrens, lua, script, consume); + } + Operator::Join(op) => { + let env = format!("hash_join_{}", op_id); + + script.push_str( + format!( + r#" + local join_temp_{op_id} = {{}} + "# + ) + .as_str(), + ); + + let insert_into_left = format!( + r#" + {}:left_build(tuple) + goto continue + "#, + env + ); + build_script( + func_op_id(), + childrens.remove(0), + lua, + script, + Box::new(move |_, script| { + script.push_str(insert_into_left.as_str()); + + Ok(()) + }), + )?; + + let insert_into_right = format!( + r#" + for _, tuple in ipairs({}:right_probe(tuple)) do + table.insert(join_temp_{op_id}, tuple) + end + goto continue + "#, + env + ); + build_script( + func_op_id(), + childrens.remove(0), + lua, + script, + Box::new(move |_, script| { + script.push_str(insert_into_right.as_str()); + + Ok(()) + }), + )?; + + let mut join = HashJoin::from((op, op_id, env)); + + join.produce(lua, script)?; + consume(lua, script)?; + join.consume(lua, script)?; + } + _ => unreachable!(), + } + + Ok(()) +} + +#[cfg(test)] +mod test { + use crate::binder::{Binder, BinderContext}; + use crate::db::{Database, DatabaseError}; + use crate::execution::codegen::execute; + use crate::parser::parse_sql; + use crate::storage::kip::KipStorage; + use crate::storage::Storage; + use crate::types::tuple::create_table; + use std::sync::Arc; + use tempfile::TempDir; + + #[tokio::test] + async fn test_scan() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + + let database = Database::with_kipdb(temp_dir.path()).await?; + + database + .run("create table t1 (c1 int primary key, c2 int)") + .await?; + database + .run("insert into t1 values(0, 1), (2, 3), (4, 5), (6, 7), (8, 9), (10, 11), (12, 13)") + .await?; + database + .run("create table t2 (c3 int primary key, c4 int)") + .await?; + database + .run("insert into t2 values(0, 1), (2, 3), (4, 5), (6, 7), (8, 9), (10, 11), (12, 13)") + .await?; + + let transaction = database.storage.transaction().await?; + + // parse + let stmts = parse_sql( + "select t1.c1, sum(t1.c2), sum(t2.c3), sum(t2.c4) from t1 left join t2 on t1.c1 = t2.c3 and t1.c1 > 3 where t1.c1 > 0 group by t1.c1", + )?; + let binder = Binder::new(BinderContext::new(&transaction)); + /// Build a logical plan. + /// + /// SELECT a,b FROM t1 ORDER BY a LIMIT 1; + /// Scan(t1) + /// Sort(a) + /// Limit(1) + /// Project(a,b) + let source_plan = binder.bind(&stmts[0])?; + // println!("source_plan plan: {:#?}", source_plan); + + let best_plan = Database::::default_optimizer(source_plan).find_best()?; + // println!("{:#?}", best_plan); + + let tuples = execute(best_plan, Arc::new(transaction)).await?; + + println!("{}", create_table(&tuples)); + Ok(()) + } +} diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 31ce96e2..effe6bf8 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -1,9 +1,13 @@ -pub mod executor; +#[cfg(feature = "codegen_execute")] +pub mod codegen; +pub mod volcano; use crate::binder::BindError; use crate::catalog::CatalogError; use crate::storage::StorageError; use crate::types::errors::TypeError; +#[cfg(feature = "codegen_execute")] +use mlua::prelude::LuaError; use sqlparser::parser::ParserError; #[derive(thiserror::Error, Debug)] @@ -60,6 +64,13 @@ pub enum ExecutorError { #[source] tokio::task::JoinError, ), + #[cfg(feature = "codegen_execute")] + #[error("lua error")] + LuaError( + #[from] + #[source] + LuaError, + ), #[error("channel close")] ChannelClose, } diff --git a/src/execution/executor/ddl/alter_table/add_column.rs b/src/execution/volcano/ddl/alter_table/add_column.rs similarity index 94% rename from src/execution/executor/ddl/alter_table/add_column.rs rename to src/execution/volcano/ddl/alter_table/add_column.rs index 5a6297c1..6da80c31 100644 --- a/src/execution/executor/ddl/alter_table/add_column.rs +++ b/src/execution/volcano/ddl/alter_table/add_column.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::BoxedExecutor; +use crate::execution::volcano::BoxedExecutor; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::{execution::ExecutorError, types::tuple_builder::TupleBuilder}; @@ -8,7 +8,7 @@ use std::sync::Arc; use crate::types::index::Index; use crate::{ - execution::executor::Executor, planner::operator::alter_table::add_column::AddColumnOperator, + execution::volcano::Executor, planner::operator::alter_table::add_column::AddColumnOperator, storage::Transaction, }; diff --git a/src/execution/executor/ddl/alter_table/drop_column.rs b/src/execution/volcano/ddl/alter_table/drop_column.rs similarity index 97% rename from src/execution/executor/ddl/alter_table/drop_column.rs rename to src/execution/volcano/ddl/alter_table/drop_column.rs index f114e7eb..27b47074 100644 --- a/src/execution/executor/ddl/alter_table/drop_column.rs +++ b/src/execution/volcano/ddl/alter_table/drop_column.rs @@ -1,5 +1,5 @@ use crate::binder::BindError; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::alter_table::drop_column::DropColumnOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/ddl/alter_table/mod.rs b/src/execution/volcano/ddl/alter_table/mod.rs similarity index 100% rename from src/execution/executor/ddl/alter_table/mod.rs rename to src/execution/volcano/ddl/alter_table/mod.rs diff --git a/src/execution/executor/ddl/create_table.rs b/src/execution/volcano/ddl/create_table.rs similarity index 95% rename from src/execution/executor/ddl/create_table.rs rename to src/execution/volcano/ddl/create_table.rs index 387b8018..45d66270 100644 --- a/src/execution/executor/ddl/create_table.rs +++ b/src/execution/volcano/ddl/create_table.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::create_table::CreateTableOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/ddl/drop_table.rs b/src/execution/volcano/ddl/drop_table.rs similarity index 93% rename from src/execution/executor/ddl/drop_table.rs rename to src/execution/volcano/ddl/drop_table.rs index 92366730..ae94f194 100644 --- a/src/execution/executor/ddl/drop_table.rs +++ b/src/execution/volcano/ddl/drop_table.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::drop_table::DropTableOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/ddl/mod.rs b/src/execution/volcano/ddl/mod.rs similarity index 100% rename from src/execution/executor/ddl/mod.rs rename to src/execution/volcano/ddl/mod.rs diff --git a/src/execution/executor/ddl/truncate.rs b/src/execution/volcano/ddl/truncate.rs similarity index 93% rename from src/execution/executor/ddl/truncate.rs rename to src/execution/volcano/ddl/truncate.rs index 661959cd..41f858af 100644 --- a/src/execution/executor/ddl/truncate.rs +++ b/src/execution/volcano/ddl/truncate.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::truncate::TruncateOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/dml/copy_from_file.rs b/src/execution/volcano/dml/copy_from_file.rs similarity index 97% rename from src/execution/executor/dml/copy_from_file.rs rename to src/execution/volcano/dml/copy_from_file.rs index cb4609bc..eecef01f 100644 --- a/src/execution/executor/dml/copy_from_file.rs +++ b/src/execution/volcano/dml/copy_from_file.rs @@ -1,5 +1,5 @@ use crate::binder::copy::FileFormat; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::copy_from_file::CopyFromFileOperator; use crate::storage::Transaction; @@ -132,6 +132,7 @@ mod tests { summary: ColumnSummary { id: Some(0), name: "a".to_string(), + table_name: None, }, nullable: false, desc: ColumnDesc::new(LogicalType::Integer, true, false, None), @@ -141,6 +142,7 @@ mod tests { summary: ColumnSummary { id: Some(1), name: "b".to_string(), + table_name: None, }, nullable: false, desc: ColumnDesc::new(LogicalType::Float, false, false, None), @@ -150,6 +152,7 @@ mod tests { summary: ColumnSummary { id: Some(1), name: "c".to_string(), + table_name: None, }, nullable: false, desc: ColumnDesc::new(LogicalType::Varchar(Some(10)), false, false, None), diff --git a/src/execution/executor/dml/copy_to_file.rs b/src/execution/volcano/dml/copy_to_file.rs similarity index 100% rename from src/execution/executor/dml/copy_to_file.rs rename to src/execution/volcano/dml/copy_to_file.rs diff --git a/src/execution/executor/dml/delete.rs b/src/execution/volcano/dml/delete.rs similarity index 97% rename from src/execution/executor/dml/delete.rs rename to src/execution/volcano/dml/delete.rs index 9eb5937d..d96d785f 100644 --- a/src/execution/executor/dml/delete.rs +++ b/src/execution/volcano/dml/delete.rs @@ -1,5 +1,5 @@ use crate::catalog::TableName; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::delete::DeleteOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/dml/insert.rs b/src/execution/volcano/dml/insert.rs similarity index 98% rename from src/execution/executor/dml/insert.rs rename to src/execution/volcano/dml/insert.rs index cbf5ed92..1aabfe5b 100644 --- a/src/execution/executor/dml/insert.rs +++ b/src/execution/volcano/dml/insert.rs @@ -1,5 +1,5 @@ use crate::catalog::TableName; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::insert::InsertOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/dml/mod.rs b/src/execution/volcano/dml/mod.rs similarity index 100% rename from src/execution/executor/dml/mod.rs rename to src/execution/volcano/dml/mod.rs diff --git a/src/execution/executor/dml/update.rs b/src/execution/volcano/dml/update.rs similarity index 98% rename from src/execution/executor/dml/update.rs rename to src/execution/volcano/dml/update.rs index b5a93eb6..5584fcc7 100644 --- a/src/execution/executor/dml/update.rs +++ b/src/execution/volcano/dml/update.rs @@ -1,5 +1,5 @@ use crate::catalog::TableName; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::update::UpdateOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/dql/aggregate/avg.rs b/src/execution/volcano/dql/aggregate/avg.rs similarity index 89% rename from src/execution/executor/dql/aggregate/avg.rs rename to src/execution/volcano/dql/aggregate/avg.rs index 599a1295..822c5de7 100644 --- a/src/execution/executor/dql/aggregate/avg.rs +++ b/src/execution/volcano/dql/aggregate/avg.rs @@ -1,5 +1,5 @@ -use crate::execution::executor::dql::aggregate::sum::SumAccumulator; -use crate::execution::executor::dql::aggregate::Accumulator; +use crate::execution::volcano::dql::aggregate::sum::SumAccumulator; +use crate::execution::volcano::dql::aggregate::Accumulator; use crate::execution::ExecutorError; use crate::expression::value_compute::binary_op; use crate::expression::BinaryOperator; diff --git a/src/execution/executor/dql/aggregate/count.rs b/src/execution/volcano/dql/aggregate/count.rs similarity index 95% rename from src/execution/executor/dql/aggregate/count.rs rename to src/execution/volcano/dql/aggregate/count.rs index d649b022..77e6391c 100644 --- a/src/execution/executor/dql/aggregate/count.rs +++ b/src/execution/volcano/dql/aggregate/count.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::dql::aggregate::Accumulator; +use crate::execution::volcano::dql::aggregate::Accumulator; use crate::execution::ExecutorError; use crate::types::value::{DataValue, ValueRef}; use ahash::RandomState; diff --git a/src/execution/executor/dql/aggregate/hash_agg.rs b/src/execution/volcano/dql/aggregate/hash_agg.rs similarity index 62% rename from src/execution/executor/dql/aggregate/hash_agg.rs rename to src/execution/volcano/dql/aggregate/hash_agg.rs index be59f892..28cef0e1 100644 --- a/src/execution/executor/dql/aggregate/hash_agg.rs +++ b/src/execution/volcano/dql/aggregate/hash_agg.rs @@ -1,20 +1,21 @@ -use crate::execution::executor::dql::aggregate::create_accumulators; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::catalog::ColumnRef; +use crate::execution::volcano::dql::aggregate::{create_accumulators, Accumulator}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::storage::Transaction; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; -use ahash::{HashMap, HashMapExt}; +use ahash::HashMap; use futures_async_stream::try_stream; use itertools::Itertools; use std::cell::RefCell; pub struct HashAggExecutor { - pub agg_calls: Vec, - pub groupby_exprs: Vec, - pub input: BoxedExecutor, + agg_calls: Vec, + groupby_exprs: Vec, + input: BoxedExecutor, } impl From<(AggregateOperator, BoxedExecutor)> for HashAggExecutor { @@ -41,57 +42,76 @@ impl Executor for HashAggExecutor { } } -impl HashAggExecutor { - #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let mut group_and_agg_columns_option = None; - let mut group_hash_accs = HashMap::new(); +pub(crate) struct HashAggStatus { + agg_calls: Vec, + groupby_exprs: Vec, - #[for_await] - for tuple in self.input { - let tuple = tuple?; + group_columns: Vec, + group_hash_accs: HashMap, Vec>>, +} - // 1. build group and agg columns for hash_agg columns. - // Tips: AggCall First - group_and_agg_columns_option.get_or_insert_with(|| { - self.agg_calls - .iter() - .chain(self.groupby_exprs.iter()) - .map(|expr| expr.output_columns()) - .collect_vec() - }); +impl HashAggStatus { + pub(crate) fn new( + agg_calls: Vec, + groupby_exprs: Vec, + ) -> Self { + HashAggStatus { + agg_calls, + groupby_exprs, + group_columns: vec![], + group_hash_accs: Default::default(), + } + } - // 2.1 evaluate agg exprs and collect the result values for later accumulators. - let values: Vec = self + pub(crate) fn update(&mut self, tuple: Tuple) -> Result<(), ExecutorError> { + // 1. build group and agg columns for hash_agg columns. + // Tips: AggCall First + if self.group_columns.is_empty() { + self.group_columns = self .agg_calls .iter() - .map(|expr| { - if let ScalarExpression::AggCall { args, .. } = expr { - args[0].eval(&tuple) - } else { - unreachable!() - } - }) - .try_collect()?; + .chain(self.groupby_exprs.iter()) + .map(|expr| expr.output_column()) + .collect_vec(); + } - let group_keys: Vec = self - .groupby_exprs - .iter() - .map(|expr| expr.eval(&tuple)) - .try_collect()?; - - for (acc, value) in group_hash_accs - .entry(group_keys) - .or_insert_with(|| create_accumulators(&self.agg_calls)) - .iter_mut() - .zip_eq(values.iter()) - { - acc.update_value(value)?; - } + // 2.1 evaluate agg exprs and collect the result values for later accumulators. + let values: Vec = self + .agg_calls + .iter() + .map(|expr| { + if let ScalarExpression::AggCall { args, .. } = expr { + args[0].eval(&tuple) + } else { + unreachable!() + } + }) + .try_collect()?; + + let group_keys: Vec = self + .groupby_exprs + .iter() + .map(|expr| expr.eval(&tuple)) + .try_collect()?; + + for (acc, value) in self + .group_hash_accs + .entry(group_keys) + .or_insert_with(|| create_accumulators(&self.agg_calls)) + .iter_mut() + .zip_eq(values.iter()) + { + acc.update_value(value)?; } - if let Some(group_and_agg_columns) = group_and_agg_columns_option { - for (group_keys, accs) in group_hash_accs { + Ok(()) + } + + pub(crate) fn to_tuples(&mut self) -> Result, ExecutorError> { + Ok(self + .group_hash_accs + .drain() + .map(|(group_keys, accs)| { // Tips: Accumulator First let values: Vec = accs .iter() @@ -99,12 +119,34 @@ impl HashAggExecutor { .chain(group_keys.into_iter().map(Ok)) .try_collect()?; - yield Tuple { + Ok::(Tuple { id: None, - columns: group_and_agg_columns.clone(), + columns: self.group_columns.clone(), values, - }; - } + }) + }) + .try_collect()?) + } +} + +impl HashAggExecutor { + #[try_stream(boxed, ok = Tuple, error = ExecutorError)] + pub async fn _execute(self) { + let HashAggExecutor { + agg_calls, + groupby_exprs, + .. + } = self; + + let mut agg_status = HashAggStatus::new(agg_calls, groupby_exprs); + + #[for_await] + for tuple in self.input { + agg_status.update(tuple?)?; + } + + for tuple in agg_status.to_tuples()? { + yield tuple; } } } @@ -112,10 +154,10 @@ impl HashAggExecutor { #[cfg(test)] mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; - use crate::execution::executor::dql::aggregate::hash_agg::HashAggExecutor; - use crate::execution::executor::dql::test::build_integers; - use crate::execution::executor::dql::values::Values; - use crate::execution::executor::{try_collect, Executor}; + use crate::execution::volcano::dql::aggregate::hash_agg::HashAggExecutor; + use crate::execution::volcano::dql::test::build_integers; + use crate::execution::volcano::dql::values::Values; + use crate::execution::volcano::{try_collect, Executor}; use crate::execution::ExecutorError; use crate::expression::agg::AggKind; use crate::expression::ScalarExpression; diff --git a/src/execution/executor/dql/aggregate/min_max.rs b/src/execution/volcano/dql/aggregate/min_max.rs similarity index 95% rename from src/execution/executor/dql/aggregate/min_max.rs rename to src/execution/volcano/dql/aggregate/min_max.rs index dae6f725..1cbf649d 100644 --- a/src/execution/executor/dql/aggregate/min_max.rs +++ b/src/execution/volcano/dql/aggregate/min_max.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::dql::aggregate::Accumulator; +use crate::execution::volcano::dql::aggregate::Accumulator; use crate::execution::ExecutorError; use crate::expression::value_compute::binary_op; use crate::expression::BinaryOperator; diff --git a/src/execution/executor/dql/aggregate/mod.rs b/src/execution/volcano/dql/aggregate/mod.rs similarity index 81% rename from src/execution/executor/dql/aggregate/mod.rs rename to src/execution/volcano/dql/aggregate/mod.rs index c40ee2c4..6a416763 100644 --- a/src/execution/executor/dql/aggregate/mod.rs +++ b/src/execution/volcano/dql/aggregate/mod.rs @@ -5,12 +5,12 @@ mod min_max; pub mod simple_agg; mod sum; -use crate::execution::executor::dql::aggregate::avg::AvgAccumulator; -use crate::execution::executor::dql::aggregate::count::{ +use crate::execution::volcano::dql::aggregate::avg::AvgAccumulator; +use crate::execution::volcano::dql::aggregate::count::{ CountAccumulator, DistinctCountAccumulator, }; -use crate::execution::executor::dql::aggregate::min_max::MinMaxAccumulator; -use crate::execution::executor::dql::aggregate::sum::{DistinctSumAccumulator, SumAccumulator}; +use crate::execution::volcano::dql::aggregate::min_max::MinMaxAccumulator; +use crate::execution::volcano::dql::aggregate::sum::{DistinctSumAccumulator, SumAccumulator}; use crate::execution::ExecutorError; use crate::expression::agg::AggKind; use crate::expression::ScalarExpression; @@ -49,6 +49,6 @@ fn create_accumulator(expr: &ScalarExpression) -> Box { } } -fn create_accumulators(exprs: &[ScalarExpression]) -> Vec> { +pub(crate) fn create_accumulators(exprs: &[ScalarExpression]) -> Vec> { exprs.iter().map(create_accumulator).collect() } diff --git a/src/execution/executor/dql/aggregate/simple_agg.rs b/src/execution/volcano/dql/aggregate/simple_agg.rs similarity index 88% rename from src/execution/executor/dql/aggregate/simple_agg.rs rename to src/execution/volcano/dql/aggregate/simple_agg.rs index 81454d94..512cf619 100644 --- a/src/execution/executor/dql/aggregate/simple_agg.rs +++ b/src/execution/volcano/dql/aggregate/simple_agg.rs @@ -1,5 +1,5 @@ -use crate::execution::executor::dql::aggregate::create_accumulators; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::dql::aggregate::create_accumulators; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; @@ -11,8 +11,8 @@ use itertools::Itertools; use std::cell::RefCell; pub struct SimpleAggExecutor { - pub agg_calls: Vec, - pub input: BoxedExecutor, + agg_calls: Vec, + input: BoxedExecutor, } impl From<(AggregateOperator, BoxedExecutor)> for SimpleAggExecutor { @@ -42,7 +42,7 @@ impl SimpleAggExecutor { columns_option.get_or_insert_with(|| { self.agg_calls .iter() - .map(|expr| expr.output_columns()) + .map(|expr| expr.output_column()) .collect_vec() }); diff --git a/src/execution/executor/dql/aggregate/sum.rs b/src/execution/volcano/dql/aggregate/sum.rs similarity index 96% rename from src/execution/executor/dql/aggregate/sum.rs rename to src/execution/volcano/dql/aggregate/sum.rs index 46d84e8c..baee8e14 100644 --- a/src/execution/executor/dql/aggregate/sum.rs +++ b/src/execution/volcano/dql/aggregate/sum.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::dql::aggregate::Accumulator; +use crate::execution::volcano::dql::aggregate::Accumulator; use crate::execution::ExecutorError; use crate::expression::value_compute::binary_op; use crate::expression::BinaryOperator; diff --git a/src/execution/executor/dql/dummy.rs b/src/execution/volcano/dql/dummy.rs similarity index 88% rename from src/execution/executor/dql/dummy.rs rename to src/execution/volcano/dql/dummy.rs index 5e8e756f..37d55ec8 100644 --- a/src/execution/executor/dql/dummy.rs +++ b/src/execution/volcano/dql/dummy.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::storage::Transaction; use crate::types::tuple::Tuple; diff --git a/src/execution/executor/dql/filter.rs b/src/execution/volcano/dql/filter.rs similarity index 95% rename from src/execution/executor/dql/filter.rs rename to src/execution/volcano/dql/filter.rs index b86da170..38180c2a 100644 --- a/src/execution/executor/dql/filter.rs +++ b/src/execution/volcano/dql/filter.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::filter::FilterOperator; diff --git a/src/execution/executor/dql/index_scan.rs b/src/execution/volcano/dql/index_scan.rs similarity index 95% rename from src/execution/executor/dql/index_scan.rs rename to src/execution/volcano/dql/index_scan.rs index 4bea39d8..8bc68033 100644 --- a/src/execution/executor/dql/index_scan.rs +++ b/src/execution/volcano/dql/index_scan.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::scan::ScanOperator; use crate::storage::{Iter, Transaction}; diff --git a/src/execution/executor/dql/join/hash_join.rs b/src/execution/volcano/dql/join/hash_join.rs similarity index 65% rename from src/execution/executor/dql/join/hash_join.rs rename to src/execution/volcano/dql/join/hash_join.rs index 8f785624..b5606260 100644 --- a/src/execution/executor/dql/join/hash_join.rs +++ b/src/execution/volcano/dql/join/hash_join.rs @@ -1,17 +1,18 @@ use crate::catalog::{ColumnCatalog, ColumnRef}; -use crate::execution::executor::dql::join::joins_nullable; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::dql::join::joins_nullable; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::storage::Transaction; use crate::types::errors::TypeError; use crate::types::tuple::Tuple; -use crate::types::value::DataValue; -use ahash::{HashMap, HashMapExt, HashSet, HashSetExt, RandomState}; +use crate::types::value::{DataValue, ValueRef}; +use ahash::{HashMap, HashSet, HashSetExt, RandomState}; use futures_async_stream::try_stream; use itertools::Itertools; use std::cell::RefCell; +use std::mem; use std::sync::Arc; pub struct HashJoin { @@ -44,16 +45,25 @@ impl Executor for HashJoin { } } -impl HashJoin { - #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self) { - let HashJoin { - on, - ty, - left_input, - right_input, - } = self; +pub(crate) struct HashJoinStatus { + ty: JoinType, + filter: Option, + + join_columns: Vec, + used_set: HashSet, + build_map: HashMap>, + hash_random_state: RandomState, + + left_init_flag: bool, + left_force_nullable: bool, + on_left_keys: Vec, + right_init_flag: bool, + right_force_nullable: bool, + on_right_keys: Vec, +} +impl HashJoinStatus { + pub(crate) fn new(on: JoinCondition, ty: JoinType) -> Self { if ty == JoinType::Cross { unreachable!("Cross join should not be in HashJoinExecutor"); } @@ -64,155 +74,192 @@ impl HashJoin { JoinCondition::On { on, filter } => (on.into_iter().unzip(), filter), JoinCondition::None => unreachable!("HashJoin must has on condition"), }; + let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); + + HashJoinStatus { + ty, + filter, + + join_columns: vec![], + used_set: HashSet::new(), + build_map: Default::default(), + hash_random_state: RandomState::with_seeds(0, 0, 0, 0), + + left_init_flag: false, + left_force_nullable, + on_left_keys, + right_init_flag: false, + right_force_nullable, + on_right_keys, + } + } - let mut join_columns = Vec::new(); - let mut used_set = HashSet::::new(); - let mut left_map = HashMap::new(); + pub(crate) fn left_build(&mut self, tuple: Tuple) -> Result<(), ExecutorError> { + let HashJoinStatus { + on_left_keys, + hash_random_state, + left_init_flag, + join_columns, + left_force_nullable, + build_map, + .. + } = self; - let hash_random_state = RandomState::with_seeds(0, 0, 0, 0); - let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); + let hash = Self::hash_row(on_left_keys, hash_random_state, &tuple)?; - // build phase: - // 1.construct hashtable, one hash key may contains multiple rows indices. - // 2.merged all left tuples. - let mut left_init_flag = false; - #[for_await] - for tuple in left_input { - let tuple: Tuple = tuple?; - let hash = Self::hash_row(&on_left_keys, &hash_random_state, &tuple)?; + if !*left_init_flag { + Self::columns_filling(&tuple, join_columns, *left_force_nullable); + let _ = mem::replace(left_init_flag, true); + } - if !left_init_flag { - Self::columns_filling(&tuple, &mut join_columns, left_force_nullable); - left_init_flag = true; - } + build_map.entry(hash).or_insert(Vec::new()).push(tuple); - left_map.entry(hash).or_insert(Vec::new()).push(tuple); + Ok(()) + } + + pub(crate) fn right_probe(&mut self, tuple: Tuple) -> Result, ExecutorError> { + let HashJoinStatus { + hash_random_state, + join_columns, + on_right_keys, + right_init_flag, + right_force_nullable, + build_map, + used_set, + ty, + filter, + .. + } = self; + + let right_cols_len = tuple.columns.len(); + let hash = Self::hash_row(&on_right_keys, &hash_random_state, &tuple)?; + + if !*right_init_flag { + Self::columns_filling(&tuple, join_columns, *right_force_nullable); + let _ = mem::replace(right_init_flag, true); } - // probe phase - let mut right_init_flag = false; - #[for_await] - for tuple in right_input { - let tuple: Tuple = tuple?; - let right_cols_len = tuple.columns.len(); - let hash = Self::hash_row(&on_right_keys, &hash_random_state, &tuple)?; + let mut join_tuples = if let Some(tuples) = build_map.get(&hash) { + let _ = used_set.insert(hash); - if !right_init_flag { - Self::columns_filling(&tuple, &mut join_columns, right_force_nullable); - right_init_flag = true; - } + tuples + .iter() + .map(|Tuple { values, .. }| { + let full_values = values + .iter() + .cloned() + .chain(tuple.values.clone()) + .collect_vec(); - let mut join_tuples = if let Some(tuples) = left_map.get(&hash) { - let _ = used_set.insert(hash); - - tuples - .iter() - .map(|Tuple { values, .. }| { - let full_values = values - .iter() - .cloned() - .chain(tuple.values.clone()) - .collect_vec(); - - Tuple { - id: None, - columns: join_columns.clone(), - values: full_values, - } - }) - .collect_vec() - } else if matches!(ty, JoinType::Right | JoinType::Full) { - let empty_len = join_columns.len() - right_cols_len; - let values = join_columns[..empty_len] - .iter() - .map(|col| Arc::new(DataValue::none(col.datatype()))) - .chain(tuple.values) - .collect_vec(); - - vec![Tuple { - id: None, - columns: join_columns.clone(), - values, - }] - } else { - vec![] - }; - - // on filter - if let (Some(expr), false) = ( - &filter, - join_tuples.is_empty() || matches!(ty, JoinType::Full | JoinType::Cross), - ) { - let mut filter_tuples = Vec::with_capacity(join_tuples.len()); - - for mut tuple in join_tuples { - if let DataValue::Boolean(option) = expr.eval(&tuple)?.as_ref() { - if let Some(false) | None = option { - let full_cols_len = tuple.columns.len(); - let left_cols_len = full_cols_len - right_cols_len; - - match ty { - JoinType::Left => { - for i in left_cols_len..full_cols_len { - let value_type = tuple.columns[i].datatype(); - - tuple.values[i] = Arc::new(DataValue::none(value_type)) - } - filter_tuples.push(tuple) + Tuple { + id: None, + columns: join_columns.clone(), + values: full_values, + } + }) + .collect_vec() + } else if matches!(ty, JoinType::Right | JoinType::Full) { + let empty_len = join_columns.len() - right_cols_len; + let values = join_columns[..empty_len] + .iter() + .map(|col| Arc::new(DataValue::none(col.datatype()))) + .chain(tuple.values) + .collect_vec(); + + vec![Tuple { + id: None, + columns: join_columns.clone(), + values, + }] + } else { + vec![] + }; + + // on filter + if let (Some(expr), false) = ( + &filter, + join_tuples.is_empty() || matches!(ty, JoinType::Full | JoinType::Cross), + ) { + let mut filter_tuples = Vec::with_capacity(join_tuples.len()); + + for mut tuple in join_tuples { + if let DataValue::Boolean(option) = expr.eval(&tuple)?.as_ref() { + if let Some(false) | None = option { + let full_cols_len = tuple.columns.len(); + let left_cols_len = full_cols_len - right_cols_len; + + match ty { + JoinType::Left => { + for i in left_cols_len..full_cols_len { + let value_type = tuple.columns[i].datatype(); + + tuple.values[i] = Arc::new(DataValue::none(value_type)) } - JoinType::Right => { - for i in 0..left_cols_len { - let value_type = tuple.columns[i].datatype(); + filter_tuples.push(tuple) + } + JoinType::Right => { + for i in 0..left_cols_len { + let value_type = tuple.columns[i].datatype(); - tuple.values[i] = Arc::new(DataValue::none(value_type)) - } - filter_tuples.push(tuple) + tuple.values[i] = Arc::new(DataValue::none(value_type)) } - _ => (), + filter_tuples.push(tuple) } - } else { - filter_tuples.push(tuple) + _ => (), } } else { - unreachable!("only bool"); + filter_tuples.push(tuple) } + } else { + unreachable!("only bool"); } - - join_tuples = filter_tuples; } - for tuple in join_tuples { - yield tuple - } + join_tuples = filter_tuples; } - if matches!(ty, JoinType::Left | JoinType::Full) { - for (hash, tuples) in left_map { - if used_set.contains(&hash) { - continue; - } - - for Tuple { - mut values, - columns, - .. - } in tuples - { - let mut right_empties = join_columns[columns.len()..] - .iter() - .map(|col| Arc::new(DataValue::none(col.datatype()))) - .collect_vec(); + Ok(join_tuples) + } - values.append(&mut right_empties); + pub(crate) fn build_drop(&mut self) -> Vec { + let HashJoinStatus { + join_columns, + build_map, + used_set, + ty, + .. + } = self; - yield Tuple { - id: None, - columns: join_columns.clone(), - values, - } - } - } - } + matches!(ty, JoinType::Left | JoinType::Full) + .then(|| { + build_map + .drain() + .filter(|(hash, _)| !used_set.contains(hash)) + .map(|(_, mut tuples)| { + for Tuple { + values, + columns, + id, + } in tuples.iter_mut() + { + let _ = mem::replace(id, None); + let (mut right_values, mut right_columns): ( + Vec, + Vec, + ) = join_columns[columns.len()..] + .iter() + .map(|col| (Arc::new(DataValue::none(col.datatype())), col.clone())) + .unzip(); + + values.append(&mut right_values); + columns.append(&mut right_columns); + } + tuples + }) + .flatten() + .collect_vec() + }) + .unwrap_or_else(|| vec![]) } fn columns_filling(tuple: &Tuple, join_columns: &mut Vec, force_nullable: bool) { @@ -246,13 +293,51 @@ impl HashJoin { } } +impl HashJoin { + #[try_stream(boxed, ok = Tuple, error = ExecutorError)] + pub async fn _execute(self) { + let HashJoin { + on, + ty, + left_input, + right_input, + } = self; + + let mut join_status = HashJoinStatus::new(on, ty); + + // build phase: + // 1.construct hashtable, one hash key may contains multiple rows indices. + // 2.merged all left tuples. + #[for_await] + for tuple in left_input { + let tuple: Tuple = tuple?; + + join_status.left_build(tuple)?; + } + + // probe phase + #[for_await] + for tuple in right_input { + let tuple: Tuple = tuple?; + + for tuple in join_status.right_probe(tuple)? { + yield tuple + } + } + + for tuple in join_status.build_drop() { + yield tuple + } + } +} + #[cfg(test)] mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; - use crate::execution::executor::dql::join::hash_join::HashJoin; - use crate::execution::executor::dql::test::build_integers; - use crate::execution::executor::dql::values::Values; - use crate::execution::executor::{try_collect, BoxedExecutor, Executor}; + use crate::execution::volcano::dql::join::hash_join::HashJoin; + use crate::execution::volcano::dql::test::build_integers; + use crate::execution::volcano::dql::values::Values; + use crate::execution::volcano::{try_collect, BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; diff --git a/src/execution/executor/dql/join/mod.rs b/src/execution/volcano/dql/join/mod.rs similarity index 100% rename from src/execution/executor/dql/join/mod.rs rename to src/execution/volcano/dql/join/mod.rs diff --git a/src/execution/executor/dql/limit.rs b/src/execution/volcano/dql/limit.rs similarity index 95% rename from src/execution/executor/dql/limit.rs rename to src/execution/volcano/dql/limit.rs index d3c0ddb5..c0c49a3c 100644 --- a/src/execution/executor/dql/limit.rs +++ b/src/execution/volcano/dql/limit.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::limit::LimitOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/dql/mod.rs b/src/execution/volcano/dql/mod.rs similarity index 100% rename from src/execution/executor/dql/mod.rs rename to src/execution/volcano/dql/mod.rs diff --git a/src/execution/executor/dql/projection.rs b/src/execution/volcano/dql/projection.rs similarity index 81% rename from src/execution/executor/dql/projection.rs rename to src/execution/volcano/dql/projection.rs index d3a0a218..fa82ffdd 100644 --- a/src/execution/executor/dql/projection.rs +++ b/src/execution/volcano/dql/projection.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::expression::ScalarExpression; use crate::planner::operator::project::ProjectOperator; @@ -31,21 +31,20 @@ impl Projection { #[for_await] for tuple in input { - let tuple = tuple?; + let mut tuple = tuple?; let mut columns = Vec::with_capacity(exprs.len()); let mut values = Vec::with_capacity(exprs.len()); for expr in exprs.iter() { values.push(expr.eval(&tuple)?); - columns.push(expr.output_columns()); + columns.push(expr.output_column()); } - yield Tuple { - id: None, - columns, - values, - }; + tuple.columns = columns; + tuple.values = values; + + yield tuple; } } } diff --git a/src/execution/executor/dql/seq_scan.rs b/src/execution/volcano/dql/seq_scan.rs similarity index 94% rename from src/execution/executor/dql/seq_scan.rs rename to src/execution/volcano/dql/seq_scan.rs index 27b798fb..82b4d1d7 100644 --- a/src/execution/executor/dql/seq_scan.rs +++ b/src/execution/volcano/dql/seq_scan.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::scan::ScanOperator; use crate::storage::{Iter, Transaction}; diff --git a/src/execution/executor/dql/sort.rs b/src/execution/volcano/dql/sort.rs similarity index 71% rename from src/execution/executor/dql/sort.rs rename to src/execution/volcano/dql/sort.rs index ad64503e..3767f899 100644 --- a/src/execution/executor/dql/sort.rs +++ b/src/execution/volcano/dql/sort.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::storage::Transaction; @@ -38,6 +38,40 @@ fn radix_sort(mut tuples: Vec<(T, Vec)>) -> Vec { Vec::new() } +pub(crate) fn sort( + sort_fields: &[SortField], + tuples: Vec, +) -> Result, ExecutorError> { + let tuples_with_keys: Vec<(Tuple, Vec)> = tuples + .into_iter() + .map(|tuple| { + let mut full_key = Vec::new(); + + for SortField { + expr, + nulls_first, + asc, + } in sort_fields + { + let mut key = Vec::new(); + + expr.eval(&tuple)?.memcomparable_encode(&mut key)?; + key.push(if *nulls_first { u8::MIN } else { u8::MAX }); + + if !asc { + for byte in key.iter_mut() { + *byte ^= 0xFF; + } + } + full_key.extend(key); + } + Ok::<(Tuple, Vec), TypeError>((tuple, full_key)) + }) + .try_collect()?; + + Ok(radix_sort(tuples_with_keys)) +} + pub struct Sort { sort_fields: Vec, limit: Option, @@ -74,33 +108,7 @@ impl Sort { for tuple in input { tuples.push(tuple?); } - let tuples_with_keys: Vec<(Tuple, Vec)> = tuples - .into_iter() - .map(|tuple| { - let mut full_key = Vec::new(); - - for SortField { - expr, - nulls_first, - asc, - } in &sort_fields - { - let mut key = Vec::new(); - - expr.eval(&tuple)?.memcomparable_encode(&mut key)?; - key.push(if *nulls_first { u8::MIN } else { u8::MAX }); - - if !asc { - for byte in key.iter_mut() { - *byte ^= 0xFF; - } - } - full_key.extend(key); - } - Ok::<(Tuple, Vec), TypeError>((tuple, full_key)) - }) - .try_collect()?; - let mut tuples = radix_sort(tuples_with_keys); + let mut tuples = sort(&sort_fields, tuples)?; let len = limit.unwrap_or(tuples.len()); for tuple in tuples.drain(..len) { diff --git a/src/execution/executor/dql/values.rs b/src/execution/volcano/dql/values.rs similarity index 93% rename from src/execution/executor/dql/values.rs rename to src/execution/volcano/dql/values.rs index 80fae528..fee5ea5e 100644 --- a/src/execution/executor/dql/values.rs +++ b/src/execution/volcano/dql/values.rs @@ -1,4 +1,4 @@ -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::values::ValuesOperator; use crate::storage::Transaction; diff --git a/src/execution/executor/mod.rs b/src/execution/volcano/mod.rs similarity index 58% rename from src/execution/executor/mod.rs rename to src/execution/volcano/mod.rs index 05de2e42..8335d948 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/volcano/mod.rs @@ -3,26 +3,26 @@ pub(crate) mod dml; pub(crate) mod dql; pub(crate) mod show; -use crate::execution::executor::ddl::alter_table::drop_column::DropColumn; -use crate::execution::executor::ddl::create_table::CreateTable; -use crate::execution::executor::ddl::drop_table::DropTable; -use crate::execution::executor::ddl::truncate::Truncate; -use crate::execution::executor::dml::copy_from_file::CopyFromFile; -use crate::execution::executor::dml::delete::Delete; -use crate::execution::executor::dml::insert::Insert; -use crate::execution::executor::dml::update::Update; -use crate::execution::executor::dql::aggregate::hash_agg::HashAggExecutor; -use crate::execution::executor::dql::aggregate::simple_agg::SimpleAggExecutor; -use crate::execution::executor::dql::dummy::Dummy; -use crate::execution::executor::dql::filter::Filter; -use crate::execution::executor::dql::index_scan::IndexScan; -use crate::execution::executor::dql::join::hash_join::HashJoin; -use crate::execution::executor::dql::limit::Limit; -use crate::execution::executor::dql::projection::Projection; -use crate::execution::executor::dql::seq_scan::SeqScan; -use crate::execution::executor::dql::sort::Sort; -use crate::execution::executor::dql::values::Values; -use crate::execution::executor::show::show_table::ShowTables; +use crate::execution::volcano::ddl::alter_table::drop_column::DropColumn; +use crate::execution::volcano::ddl::create_table::CreateTable; +use crate::execution::volcano::ddl::drop_table::DropTable; +use crate::execution::volcano::ddl::truncate::Truncate; +use crate::execution::volcano::dml::copy_from_file::CopyFromFile; +use crate::execution::volcano::dml::delete::Delete; +use crate::execution::volcano::dml::insert::Insert; +use crate::execution::volcano::dml::update::Update; +use crate::execution::volcano::dql::aggregate::hash_agg::HashAggExecutor; +use crate::execution::volcano::dql::aggregate::simple_agg::SimpleAggExecutor; +use crate::execution::volcano::dql::dummy::Dummy; +use crate::execution::volcano::dql::filter::Filter; +use crate::execution::volcano::dql::index_scan::IndexScan; +use crate::execution::volcano::dql::join::hash_join::HashJoin; +use crate::execution::volcano::dql::limit::Limit; +use crate::execution::volcano::dql::projection::Projection; +use crate::execution::volcano::dql::seq_scan::SeqScan; +use crate::execution::volcano::dql::sort::Sort; +use crate::execution::volcano::dql::values::Values; +use crate::execution::volcano::show::show_table::ShowTables; use crate::execution::ExecutorError; use crate::planner::operator::Operator; use crate::planner::LogicalPlan; @@ -40,7 +40,7 @@ pub trait Executor { fn execute(self, transaction: &RefCell) -> BoxedExecutor; } -pub fn build(plan: LogicalPlan, transaction: &RefCell) -> BoxedExecutor { +pub fn build_stream(plan: LogicalPlan, transaction: &RefCell) -> BoxedExecutor { let LogicalPlan { operator, mut childrens, @@ -49,7 +49,7 @@ pub fn build(plan: LogicalPlan, transaction: &RefCell) -> Box match operator { Operator::Dummy => Dummy {}.execute(transaction), Operator::Aggregate(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); if op.groupby_exprs.is_empty() { SimpleAggExecutor::from((op, input)).execute(transaction) @@ -58,18 +58,18 @@ pub fn build(plan: LogicalPlan, transaction: &RefCell) -> Box } } Operator::Filter(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); Filter::from((op, input)).execute(transaction) } Operator::Join(op) => { - let left_input = build(childrens.remove(0), transaction); - let right_input = build(childrens.remove(0), transaction); + let left_input = build_stream(childrens.remove(0), transaction); + let right_input = build_stream(childrens.remove(0), transaction); HashJoin::from((op, left_input, right_input)).execute(transaction) } Operator::Project(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); Projection::from((op, input)).execute(transaction) } @@ -81,38 +81,38 @@ pub fn build(plan: LogicalPlan, transaction: &RefCell) -> Box } } Operator::Sort(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); Sort::from((op, input)).execute(transaction) } Operator::Limit(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); Limit::from((op, input)).execute(transaction) } Operator::Insert(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); Insert::from((op, input)).execute(transaction) } Operator::Update(op) => { - let input = build(childrens.remove(0), transaction); - let values = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); + let values = build_stream(childrens.remove(0), transaction); Update::from((op, input, values)).execute(transaction) } Operator::Delete(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); Delete::from((op, input)).execute(transaction) } Operator::Values(op) => Values::from(op).execute(transaction), Operator::AddColumn(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); AddColumn::from((op, input)).execute(transaction) } Operator::DropColumn(op) => { - let input = build(childrens.remove(0), transaction); + let input = build_stream(childrens.remove(0), transaction); DropColumn::from((op, input)).execute(transaction) } Operator::CreateTable(op) => CreateTable::from(op).execute(transaction), diff --git a/src/execution/executor/show/mod.rs b/src/execution/volcano/show/mod.rs similarity index 100% rename from src/execution/executor/show/mod.rs rename to src/execution/volcano/show/mod.rs diff --git a/src/execution/executor/show/show_table.rs b/src/execution/volcano/show/show_table.rs similarity index 95% rename from src/execution/executor/show/show_table.rs rename to src/execution/volcano/show/show_table.rs index 0e463428..98751732 100644 --- a/src/execution/executor/show/show_table.rs +++ b/src/execution/volcano/show/show_table.rs @@ -1,6 +1,6 @@ use crate::catalog::ColumnCatalog; use crate::catalog::ColumnRef; -use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::show::ShowTablesOperator; use crate::storage::Transaction; diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index 5f87b63d..fcca9fe2 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -1,3 +1,4 @@ +use crate::catalog::ColumnSummary; use crate::expression::value_compute::{binary_op, unary_op}; use crate::expression::ScalarExpression; use crate::types::errors::TypeError; @@ -13,21 +14,26 @@ lazy_static! { impl ScalarExpression { pub fn eval(&self, tuple: &Tuple) -> Result { - if let Some(value) = Self::eval_with_name(tuple, self.output_columns().name()) { + if let Some(value) = Self::eval_with_summary(tuple, self.output_column().summary()) { return Ok(value.clone()); } match &self { ScalarExpression::Constant(val) => Ok(val.clone()), ScalarExpression::ColumnRef(col) => { - let value = Self::eval_with_name(tuple, col.name()) + let value = Self::eval_with_summary(tuple, col.summary()) .unwrap_or(&NULL_VALUE) .clone(); Ok(value) } ScalarExpression::Alias { expr, alias } => { - if let Some(value) = Self::eval_with_name(tuple, alias) { + if let Some(value) = tuple + .columns + .iter() + .find_position(|tul_col| tul_col.name() == alias) + .map(|(i, _)| &tuple.values[i]) + { return Ok(value.clone()); } @@ -80,7 +86,7 @@ impl ScalarExpression { Ok(Arc::new(unary_op(&value, op)?)) } ScalarExpression::AggCall { .. } => { - let value = Self::eval_with_name(tuple, self.output_columns().name()) + let value = Self::eval_with_summary(tuple, self.output_column().summary()) .unwrap_or(&NULL_VALUE) .clone(); @@ -89,11 +95,11 @@ impl ScalarExpression { } } - fn eval_with_name<'a>(tuple: &'a Tuple, name: &str) -> Option<&'a ValueRef> { + fn eval_with_summary<'a>(tuple: &'a Tuple, summary: &ColumnSummary) -> Option<&'a ValueRef> { tuple .columns .iter() - .find_position(|tul_col| tul_col.name() == name) + .find_position(|tul_col| tul_col.summary() == summary) .map(|(i, _)| &tuple.values[i]) } } diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 5922af94..a6334cc2 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -134,7 +134,7 @@ impl ScalarExpression { ) { // When `ScalarExpression` is a complex type, it itself is also a special Column if !only_column_ref { - vec.push(expr.output_columns()); + vec.push(expr.output_column()); } match expr { ScalarExpression::ColumnRef(col) => { @@ -197,7 +197,7 @@ impl ScalarExpression { } } - pub fn output_columns(&self) -> ColumnRef { + pub fn output_column(&self) -> ColumnRef { match self { ScalarExpression::ColumnRef(col) => col.clone(), ScalarExpression::Constant(value) => Arc::new(ColumnCatalog::new( @@ -220,7 +220,7 @@ impl ScalarExpression { } => { let args_str = args .iter() - .map(|expr| expr.output_columns().name().to_string()) + .map(|expr| expr.output_column().name().to_string()) .join(", "); let op = |allow_distinct, distinct| { if allow_distinct && distinct { @@ -251,9 +251,9 @@ impl ScalarExpression { } => { let column_name = format!( "({} {} {})", - left_expr.output_columns().name(), + left_expr.output_column().name(), op, - right_expr.output_columns().name(), + right_expr.output_column().name(), ); Arc::new(ColumnCatalog::new( @@ -264,7 +264,7 @@ impl ScalarExpression { )) } ScalarExpression::Unary { expr, op, ty } => { - let column_name = format!("{}{}", op, expr.output_columns().name()); + let column_name = format!("{}{}", op, expr.output_column().name()); Arc::new(ColumnCatalog::new( column_name, true, @@ -275,7 +275,7 @@ impl ScalarExpression { ScalarExpression::IsNull { negated, expr } => { let suffix = if *negated { "is not null" } else { "is null" }; Arc::new(ColumnCatalog::new( - format!("{} {}", expr.output_columns().name(), suffix), + format!("{} {}", expr.output_column().name(), suffix), true, ColumnDesc::new(LogicalType::Boolean, false, false, None), Some(self.clone()), @@ -288,13 +288,13 @@ impl ScalarExpression { } => { let args_string = args .iter() - .map(|arg| arg.output_columns().name().to_string()) + .map(|arg| arg.output_column().name().to_string()) .join(", "); let op_string = if *negated { "not in" } else { "in" }; Arc::new(ColumnCatalog::new( format!( "{} {} ({})", - expr.output_columns().name(), + expr.output_column().name(), op_string, args_string ), @@ -304,7 +304,7 @@ impl ScalarExpression { )) } ScalarExpression::TypeCast { expr, ty } => Arc::new(ColumnCatalog::new( - format!("CAST({} as {})", expr.output_columns().name(), ty), + format!("CAST({} as {})", expr.output_column().name(), ty), true, ColumnDesc::new(*ty, false, false, None), Some(self.clone()), diff --git a/src/expression/simplify.rs b/src/expression/simplify.rs index 2e045bc5..93849538 100644 --- a/src/expression/simplify.rs +++ b/src/expression/simplify.rs @@ -11,7 +11,7 @@ use std::collections::{Bound, HashSet}; use std::mem; use std::sync::Arc; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub enum ConstantBinary { Scope { min: Bound, @@ -913,6 +913,7 @@ mod test { summary: ColumnSummary { id: Some(0), name: "c1".to_string(), + table_name: None, }, nullable: false, desc: ColumnDesc { diff --git a/src/lib.rs b/src/lib.rs index a1478ad2..eade08e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,8 @@ pub mod catalog; pub mod db; pub mod execution; pub mod expression; -pub mod marco; +#[cfg(feature = "marcos")] +pub mod marcos; mod optimizer; pub mod parser; pub mod planner; diff --git a/src/marco/mod.rs b/src/marcos/mod.rs similarity index 100% rename from src/marco/mod.rs rename to src/marcos/mod.rs diff --git a/src/optimizer/rule/column_pruning.rs b/src/optimizer/rule/column_pruning.rs index 54977fbb..2538ff5f 100644 --- a/src/optimizer/rule/column_pruning.rs +++ b/src/optimizer/rule/column_pruning.rs @@ -30,7 +30,7 @@ impl ColumnPruning { exprs: &mut Vec, ) { exprs.retain(|expr| { - if column_references.contains(expr.output_columns().summary()) { + if column_references.contains(expr.output_column().summary()) { return true; } expr.referenced_columns(false) diff --git a/src/optimizer/rule/simplification.rs b/src/optimizer/rule/simplification.rs index 68aadadd..3b5738f4 100644 --- a/src/optimizer/rule/simplification.rs +++ b/src/optimizer/rule/simplification.rs @@ -249,6 +249,7 @@ mod test { summary: ColumnSummary { id: Some(0), name: "c1".to_string(), + table_name: Some(Arc::new("t1".to_string())), }, nullable: false, desc: ColumnDesc { @@ -263,6 +264,7 @@ mod test { summary: ColumnSummary { id: Some(1), name: "c2".to_string(), + table_name: Some(Arc::new("t1".to_string())), }, nullable: false, desc: ColumnDesc { diff --git a/src/planner/mod.rs b/src/planner/mod.rs index d46b33d6..6b0a5243 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -3,7 +3,7 @@ pub mod operator; use crate::catalog::TableName; use crate::planner::operator::Operator; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct LogicalPlan { pub operator: Operator, pub childrens: Vec, diff --git a/src/planner/operator/aggregate.rs b/src/planner/operator/aggregate.rs index 8d5973bf..c6ce8a9d 100644 --- a/src/planner/operator/aggregate.rs +++ b/src/planner/operator/aggregate.rs @@ -1,7 +1,7 @@ use crate::planner::LogicalPlan; use crate::{expression::ScalarExpression, planner::operator::Operator}; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct AggregateOperator { pub groupby_exprs: Vec, pub agg_calls: Vec, diff --git a/src/planner/operator/alter_table/add_column.rs b/src/planner/operator/alter_table/add_column.rs index 8862def4..6e87851a 100644 --- a/src/planner/operator/alter_table/add_column.rs +++ b/src/planner/operator/alter_table/add_column.rs @@ -1,6 +1,6 @@ use crate::catalog::{ColumnCatalog, TableName}; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct AddColumnOperator { pub table_name: TableName, pub if_not_exists: bool, diff --git a/src/planner/operator/alter_table/drop_column.rs b/src/planner/operator/alter_table/drop_column.rs index ea0a20da..8cf5ace5 100644 --- a/src/planner/operator/alter_table/drop_column.rs +++ b/src/planner/operator/alter_table/drop_column.rs @@ -1,6 +1,6 @@ use crate::catalog::TableName; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct DropColumnOperator { pub table_name: TableName, pub column_name: String, diff --git a/src/planner/operator/copy_from_file.rs b/src/planner/operator/copy_from_file.rs index 12c1413d..f5a3084e 100644 --- a/src/planner/operator/copy_from_file.rs +++ b/src/planner/operator/copy_from_file.rs @@ -1,7 +1,7 @@ use crate::binder::copy::ExtSource; use crate::catalog::ColumnRef; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct CopyFromFileOperator { pub table: String, pub source: ExtSource, diff --git a/src/planner/operator/copy_to_file.rs b/src/planner/operator/copy_to_file.rs index 1eb65ed8..2626c512 100644 --- a/src/planner/operator/copy_to_file.rs +++ b/src/planner/operator/copy_to_file.rs @@ -1,6 +1,6 @@ use crate::binder::copy::ExtSource; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct CopyToFileOperator { pub source: ExtSource, } diff --git a/src/planner/operator/create_table.rs b/src/planner/operator/create_table.rs index a5d07eb3..49f93e1d 100644 --- a/src/planner/operator/create_table.rs +++ b/src/planner/operator/create_table.rs @@ -1,6 +1,6 @@ use crate::catalog::{ColumnCatalog, TableName}; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct CreateTableOperator { /// Table name to insert to pub table_name: TableName, diff --git a/src/planner/operator/delete.rs b/src/planner/operator/delete.rs index 09bb023e..04261672 100644 --- a/src/planner/operator/delete.rs +++ b/src/planner/operator/delete.rs @@ -1,6 +1,6 @@ use crate::catalog::TableName; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct DeleteOperator { pub table_name: TableName, } diff --git a/src/planner/operator/drop_table.rs b/src/planner/operator/drop_table.rs index 5d7b022b..731a0087 100644 --- a/src/planner/operator/drop_table.rs +++ b/src/planner/operator/drop_table.rs @@ -1,6 +1,6 @@ use crate::catalog::TableName; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct DropTableOperator { /// Table name to insert to pub table_name: TableName, diff --git a/src/planner/operator/filter.rs b/src/planner/operator/filter.rs index fc25ad98..41fd25aa 100644 --- a/src/planner/operator/filter.rs +++ b/src/planner/operator/filter.rs @@ -5,7 +5,7 @@ use crate::planner::LogicalPlan; use super::Operator; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct FilterOperator { pub predicate: ScalarExpression, pub having: bool, diff --git a/src/planner/operator/insert.rs b/src/planner/operator/insert.rs index 0c2fa242..561947dc 100644 --- a/src/planner/operator/insert.rs +++ b/src/planner/operator/insert.rs @@ -1,6 +1,6 @@ use crate::catalog::TableName; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct InsertOperator { pub table_name: TableName, pub is_overwrite: bool, diff --git a/src/planner/operator/join.rs b/src/planner/operator/join.rs index 742d978b..459c147b 100644 --- a/src/planner/operator/join.rs +++ b/src/planner/operator/join.rs @@ -3,7 +3,7 @@ use crate::planner::LogicalPlan; use super::Operator; -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub enum JoinType { Inner, Left, @@ -11,7 +11,7 @@ pub enum JoinType { Full, Cross, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum JoinCondition { On { /// Equijoin clause expressed as pairs of (left, right) join columns @@ -22,7 +22,7 @@ pub enum JoinCondition { None, } -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct JoinOperator { pub on: JoinCondition, pub join_type: JoinType, diff --git a/src/planner/operator/limit.rs b/src/planner/operator/limit.rs index 12280f33..c72ff1e7 100644 --- a/src/planner/operator/limit.rs +++ b/src/planner/operator/limit.rs @@ -2,7 +2,7 @@ use crate::planner::LogicalPlan; use super::Operator; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct LimitOperator { pub offset: Option, pub limit: Option, diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 5c443043..5dc28385 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -38,7 +38,7 @@ use self::{ scan::ScanOperator, sort::SortOperator, }; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub enum Operator { // DQL Dummy, diff --git a/src/planner/operator/project.rs b/src/planner/operator/project.rs index ca7811e9..c9a8ee7c 100644 --- a/src/planner/operator/project.rs +++ b/src/planner/operator/project.rs @@ -1,6 +1,6 @@ use crate::expression::ScalarExpression; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct ProjectOperator { pub exprs: Vec, } diff --git a/src/planner/operator/scan.rs b/src/planner/operator/scan.rs index f11d9cc9..2782c9d9 100644 --- a/src/planner/operator/scan.rs +++ b/src/planner/operator/scan.rs @@ -8,7 +8,7 @@ use itertools::Itertools; use super::Operator; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct ScanOperator { pub index_metas: Vec, diff --git a/src/planner/operator/show.rs b/src/planner/operator/show.rs index 9d726726..7bd3585e 100644 --- a/src/planner/operator/show.rs +++ b/src/planner/operator/show.rs @@ -1,2 +1,2 @@ -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct ShowTablesOperator {} diff --git a/src/planner/operator/sort.rs b/src/planner/operator/sort.rs index 0006ab20..63bb74e2 100644 --- a/src/planner/operator/sort.rs +++ b/src/planner/operator/sort.rs @@ -1,6 +1,6 @@ use crate::expression::ScalarExpression; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct SortField { pub expr: ScalarExpression, pub asc: bool, @@ -17,7 +17,7 @@ impl SortField { } } -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct SortOperator { pub sort_fields: Vec, /// Support push down limit to sort plan. diff --git a/src/planner/operator/truncate.rs b/src/planner/operator/truncate.rs index 1b63a5d0..718f4bcb 100644 --- a/src/planner/operator/truncate.rs +++ b/src/planner/operator/truncate.rs @@ -1,6 +1,6 @@ use crate::catalog::TableName; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct TruncateOperator { /// Table name to insert to pub table_name: TableName, diff --git a/src/planner/operator/update.rs b/src/planner/operator/update.rs index ed37c72d..f4e61b56 100644 --- a/src/planner/operator/update.rs +++ b/src/planner/operator/update.rs @@ -1,6 +1,6 @@ use crate::catalog::TableName; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct UpdateOperator { pub table_name: TableName, } diff --git a/src/planner/operator/values.rs b/src/planner/operator/values.rs index 1a88c753..d0d11f9b 100644 --- a/src/planner/operator/values.rs +++ b/src/planner/operator/values.rs @@ -1,7 +1,7 @@ use crate::catalog::ColumnRef; use crate::types::value::ValueRef; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct ValuesOperator { pub rows: Vec>, pub columns: Vec, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3ce43192..818282a7 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -294,7 +294,7 @@ pub(crate) fn tuple_projection( for expr in projections.iter() { values.push(expr.eval(&tuple)?); - columns.push(expr.output_columns()); + columns.push(expr.output_column()); } if let Some(num) = limit { diff --git a/src/types/index.rs b/src/types/index.rs index c06df10d..f6660129 100644 --- a/src/types/index.rs +++ b/src/types/index.rs @@ -6,7 +6,7 @@ use std::sync::Arc; pub type IndexId = u32; pub type IndexMetaRef = Arc; -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] pub struct IndexMeta { pub id: IndexId, pub column_ids: Vec,