From b7222af9eb757eb3660b0c1638ff0542793b6d46 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Fri, 26 Jan 2024 19:27:56 +0800 Subject: [PATCH] feat: completed the integration of CBO into the optimizer(currently only single column index selection is supported) --- src/binder/alter_table.rs | 2 + src/binder/analyze.rs | 1 + src/binder/copy.rs | 2 + src/binder/create_table.rs | 1 + src/binder/delete.rs | 1 + src/binder/drop_table.rs | 1 + src/binder/insert.rs | 2 + src/binder/select.rs | 3 ++ src/binder/show.rs | 1 + src/binder/truncate.rs | 1 + src/binder/update.rs | 1 + src/db.rs | 51 ++++++++++++++++--- src/execution/volcano/dml/analyze.rs | 9 +++- src/execution/volcano/dql/index_scan.rs | 26 ++++------ src/execution/volcano/mod.rs | 17 ++++--- src/optimizer/core/histogram.rs | 14 ++--- src/optimizer/core/memo.rs | 36 ++++++++++--- src/optimizer/core/opt_expr.rs | 1 + src/optimizer/heuristic/graph.rs | 47 +++++++---------- src/optimizer/heuristic/matcher.rs | 4 ++ src/optimizer/heuristic/optimizer.rs | 20 +++++++- .../rule/implementation/dml/analyze.rs | 27 ++++++++++ src/optimizer/rule/implementation/dml/mod.rs | 1 + src/optimizer/rule/implementation/dql/scan.rs | 4 +- src/optimizer/rule/implementation/marcos.rs | 2 +- src/optimizer/rule/implementation/mod.rs | 6 +++ src/planner/mod.rs | 3 +- src/planner/operator/aggregate.rs | 1 + src/planner/operator/filter.rs | 1 + src/planner/operator/join.rs | 1 + src/planner/operator/limit.rs | 1 + src/planner/operator/mod.rs | 3 +- src/planner/operator/scan.rs | 1 + src/storage/kip.rs | 30 ++++++++--- src/storage/mod.rs | 4 ++ tests/slt/analyze.slt | 48 +++++++++++++++++ 36 files changed, 285 insertions(+), 89 deletions(-) create mode 100644 src/optimizer/rule/implementation/dml/analyze.rs create mode 100644 tests/slt/analyze.slt diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs index 1e4b9948..827fae1e 100644 --- a/src/binder/alter_table.rs +++ b/src/binder/alter_table.rs @@ -41,6 +41,7 @@ impl<'a, T: Transaction> Binder<'a, T> { column, }), childrens: vec![plan], + physical_option: None, } } AlterTableOperation::DropColumn { @@ -58,6 +59,7 @@ impl<'a, T: Transaction> Binder<'a, T> { column_name, }), childrens: vec![plan], + physical_option: None, } } AlterTableOperation::DropPrimaryKey => todo!(), diff --git a/src/binder/analyze.rs b/src/binder/analyze.rs index dba44cd8..22f08b04 100644 --- a/src/binder/analyze.rs +++ b/src/binder/analyze.rs @@ -35,6 +35,7 @@ impl<'a, T: Transaction> Binder<'a, T> { columns, }), childrens: vec![scan_op], + physical_option: None, }; Ok(plan) } diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 350e35b7..f4f26e98 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -85,6 +85,7 @@ impl<'a, T: Transaction> Binder<'a, T> { Ok(LogicalPlan { operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }), childrens: vec![], + physical_option: None, }) } else { // COPY FROM @@ -95,6 +96,7 @@ impl<'a, T: Transaction> Binder<'a, T> { table: table_name.to_string(), }), childrens: vec![], + physical_option: None, }) } } else { diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index e4a976c1..883812bf 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -86,6 +86,7 @@ impl<'a, T: Transaction> Binder<'a, T> { if_not_exists, }), childrens: vec![], + physical_option: None, }; Ok(plan) } diff --git a/src/binder/delete.rs b/src/binder/delete.rs index e6a4b4ff..bac496b3 100644 --- a/src/binder/delete.rs +++ b/src/binder/delete.rs @@ -49,6 +49,7 @@ impl<'a, T: Transaction> Binder<'a, T> { primary_key_column, }), childrens: vec![plan], + physical_option: None, }) } else { unreachable!("only table") diff --git a/src/binder/drop_table.rs b/src/binder/drop_table.rs index 2eec4562..ad878f95 100644 --- a/src/binder/drop_table.rs +++ b/src/binder/drop_table.rs @@ -22,6 +22,7 @@ impl<'a, T: Transaction> Binder<'a, T> { if_exists: *if_exists, }), childrens: vec![], + physical_option: None, }; Ok(plan) } diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 2a4e6c36..e31bac1c 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -86,6 +86,7 @@ impl<'a, T: Transaction> Binder<'a, T> { is_overwrite, }), childrens: vec![values_plan], + physical_option: None, }) } else { Err(BindError::InvalidTable(format!( @@ -103,6 +104,7 @@ impl<'a, T: Transaction> Binder<'a, T> { LogicalPlan { operator: Operator::Values(ValuesOperator { rows, columns }), childrens: vec![], + physical_option: None, } } } diff --git a/src/binder/select.rs b/src/binder/select.rs index f13ace9a..9a1a2877 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -118,6 +118,7 @@ impl<'a, T: Transaction> Binder<'a, T> { return Ok(LogicalPlan { operator: Operator::Dummy, childrens: vec![], + physical_option: None, }); } @@ -340,6 +341,7 @@ impl<'a, T: Transaction> Binder<'a, T> { LogicalPlan { operator: Operator::Project(ProjectOperator { exprs: select_list }), childrens: vec![children], + physical_option: None, } } @@ -350,6 +352,7 @@ impl<'a, T: Transaction> Binder<'a, T> { limit: None, }), childrens: vec![children], + physical_option: None, } } diff --git a/src/binder/show.rs b/src/binder/show.rs index f5855a5d..4b1da3be 100644 --- a/src/binder/show.rs +++ b/src/binder/show.rs @@ -9,6 +9,7 @@ impl<'a, T: Transaction> Binder<'a, T> { let plan = LogicalPlan { operator: Operator::Show(ShowTablesOperator {}), childrens: vec![], + physical_option: None, }; Ok(plan) } diff --git a/src/binder/truncate.rs b/src/binder/truncate.rs index 4dfb920c..d3a0e1a8 100644 --- a/src/binder/truncate.rs +++ b/src/binder/truncate.rs @@ -15,6 +15,7 @@ impl<'a, T: Transaction> Binder<'a, T> { let plan = LogicalPlan { operator: Operator::Truncate(TruncateOperator { table_name }), childrens: vec![], + physical_option: None, }; Ok(plan) } diff --git a/src/binder/update.rs b/src/binder/update.rs index 9703cf43..e0743c6a 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -58,6 +58,7 @@ impl<'a, T: Transaction> Binder<'a, T> { Ok(LogicalPlan { operator: Operator::Update(UpdateOperator { table_name }), childrens: vec![plan, values_plan], + physical_option: None, }) } else { unreachable!("only table") diff --git a/src/db.rs b/src/db.rs index 2ed847cb..0437c031 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,8 +6,10 @@ use std::path::PathBuf; use crate::binder::{BindError, Binder, BinderContext}; use crate::execution::volcano::{build_stream, try_collect}; use crate::execution::ExecutorError; +use crate::optimizer::core::histogram::HistogramLoader; use crate::optimizer::heuristic::batch::HepBatchStrategy; use crate::optimizer::heuristic::optimizer::HepOptimizer; +use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::optimizer::OptimizerError; use crate::parser::parse_sql; @@ -81,7 +83,7 @@ impl Database { /// Run SQL queries. pub async fn run>(&self, sql: T) -> Result, DatabaseError> { let transaction = self.storage.transaction().await?; - let (plan, _) = Self::build_plan(sql, &transaction)?; + let (plan, _) = Self::build_plan::(sql, &transaction)?; Self::run_volcano(transaction, plan).await } @@ -107,8 +109,8 @@ impl Database { }) } - pub fn build_plan>( - sql: T, + pub fn build_plan, T: Transaction>( + sql: V, transaction: &::TransactionType, ) -> Result<(LogicalPlan, Statement), DatabaseError> { // parse @@ -127,13 +129,17 @@ impl Database { let source_plan = binder.bind(&stmts[0])?; // println!("source_plan plan: {:#?}", source_plan); - let best_plan = Self::default_optimizer(source_plan).find_best()?; + let best_plan = + Self::default_optimizer(source_plan, &transaction.histogram_loader())?.find_best()?; // println!("best_plan plan: {:#?}", best_plan); Ok((best_plan, stmts.remove(0))) } - pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer { + pub(crate) fn default_optimizer( + source_plan: LogicalPlan, + loader: &HistogramLoader<'_, T>, + ) -> Result { HepOptimizer::new(source_plan) .batch( "Column Pruning".to_string(), @@ -174,6 +180,36 @@ impl Database { NormalizationRuleImpl::EliminateLimits, ], ) + .build_memo( + loader, + &[ + // DQL + ImplementationRuleImpl::SimpleAggregate, + ImplementationRuleImpl::GroupByAggregate, + ImplementationRuleImpl::Dummy, + ImplementationRuleImpl::Filter, + ImplementationRuleImpl::HashJoin, + ImplementationRuleImpl::Limit, + ImplementationRuleImpl::Projection, + ImplementationRuleImpl::SeqScan, + ImplementationRuleImpl::IndexScan, + ImplementationRuleImpl::Sort, + ImplementationRuleImpl::Values, + // DML + ImplementationRuleImpl::Analyze, + ImplementationRuleImpl::CopyFromFile, + ImplementationRuleImpl::CopyToFile, + ImplementationRuleImpl::Delete, + ImplementationRuleImpl::Insert, + ImplementationRuleImpl::Update, + // DLL + ImplementationRuleImpl::AddColumn, + ImplementationRuleImpl::CreateTable, + ImplementationRuleImpl::DropColumn, + ImplementationRuleImpl::DropTable, + ImplementationRuleImpl::Truncate, + ], + ) } } @@ -183,8 +219,9 @@ pub struct DBTransaction { impl DBTransaction { pub async fn run>(&mut self, sql: T) -> Result, DatabaseError> { - let (plan, _) = - Database::::build_plan(sql, unsafe { self.inner.as_ptr().as_ref().unwrap() })?; + let (plan, _) = Database::::build_plan::(sql, unsafe { + self.inner.as_ptr().as_ref().unwrap() + })?; let mut stream = build_stream(plan, &self.inner); Ok(try_collect(&mut stream).await?) diff --git a/src/execution/volcano/dml/analyze.rs b/src/execution/volcano/dml/analyze.rs index 06b805ea..bbc38bf8 100644 --- a/src/execution/volcano/dml/analyze.rs +++ b/src/execution/volcano/dml/analyze.rs @@ -2,6 +2,7 @@ use crate::catalog::{ColumnCatalog, ColumnRef, TableMeta, TableName}; use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::optimizer::core::histogram::HistogramBuilder; +use crate::optimizer::OptimizerError; use crate::planner::operator::analyze::AnalyzeOperator; use crate::storage::Transaction; use crate::types::tuple::Tuple; @@ -89,11 +90,15 @@ impl Analyze { .join(ts.to_string()); fs::create_dir_all(&dir_path)?; - let mut meta = TableMeta::empty(table_name); + let mut meta = TableMeta::empty(table_name.clone()); for (column_id, builder) in builders { let path = dir_path.join(column_id.unwrap().to_string()); - let histogram = builder.build(DEFAULT_NUM_OF_BUCKETS)?; + let histogram = match builder.build(DEFAULT_NUM_OF_BUCKETS) { + Ok(histogram) => histogram, + Err(OptimizerError::TooManyBuckets) => continue, + err => err?, + }; histogram.to_file(&path)?; diff --git a/src/execution/volcano/dql/index_scan.rs b/src/execution/volcano/dql/index_scan.rs index 28662d03..7516da17 100644 --- a/src/execution/volcano/dql/index_scan.rs +++ b/src/execution/volcano/dql/index_scan.rs @@ -2,18 +2,21 @@ use crate::execution::volcano::{BoxedExecutor, Executor}; use crate::execution::ExecutorError; use crate::planner::operator::scan::ScanOperator; use crate::storage::{Iter, Transaction}; -use crate::types::index::IndexInfo; +use crate::types::index::IndexMetaRef; use crate::types::tuple::Tuple; use futures_async_stream::try_stream; use std::cell::RefCell; +use crate::expression::simplify::ConstantBinary; pub(crate) struct IndexScan { op: ScanOperator, + index_by: IndexMetaRef, + binaries: Vec, } -impl From for IndexScan { - fn from(op: ScanOperator) -> Self { - IndexScan { op } +impl From<(ScanOperator, IndexMetaRef, Vec)> for IndexScan { + fn from((op, index_by, binaries): (ScanOperator, IndexMetaRef, Vec)) -> Self { + IndexScan { op, index_by, binaries } } } @@ -30,21 +33,12 @@ impl IndexScan { table_name, columns, limit, - mut index_infos, .. } = self.op; - if let Some(IndexInfo { - meta, - binaries: Some(binaries), - }) = index_infos.pop() - { - let mut iter = transaction.read_by_index(table_name, limit, columns, meta, binaries)?; + let mut iter = transaction.read_by_index(table_name, limit, columns, self.index_by, self.binaries)?; - while let Some(tuple) = iter.next_tuple()? { - yield tuple; - } - } else { - return Err(ExecutorError::InvalidIndex); + while let Some(tuple) = iter.next_tuple()? { + yield tuple; } } } diff --git a/src/execution/volcano/mod.rs b/src/execution/volcano/mod.rs index fc16dbe7..f08de383 100644 --- a/src/execution/volcano/mod.rs +++ b/src/execution/volcano/mod.rs @@ -24,13 +24,15 @@ use crate::execution::volcano::dql::sort::Sort; use crate::execution::volcano::dql::values::Values; use crate::execution::volcano::show::show_table::ShowTables; use crate::execution::ExecutorError; -use crate::planner::operator::Operator; +use crate::planner::operator::{Operator, PhysicalOption}; use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::tuple::Tuple; use futures::stream::BoxStream; use futures::TryStreamExt; use std::cell::RefCell; +use crate::execution::volcano::dql::index_scan::IndexScan; +use crate::types::index::IndexInfo; use self::ddl::add_column::AddColumn; @@ -44,6 +46,7 @@ pub fn build_stream(plan: LogicalPlan, transaction: &RefCell) let LogicalPlan { operator, mut childrens, + .. } = plan; match operator { @@ -74,13 +77,11 @@ pub fn build_stream(plan: LogicalPlan, transaction: &RefCell) Projection::from((op, input)).execute(transaction) } Operator::Scan(op) => { - SeqScan::from(op).execute(transaction) - // Fixme - // if op.index_infos.is_empty() { - // SeqScan::from(op).execute(transaction) - // } else { - // IndexScan::from(op).execute(transaction) - // } + if let Some(PhysicalOption::IndexScan(IndexInfo { meta, binaries: Some(binaries) })) = plan.physical_option { + IndexScan::from((op, meta, binaries)).execute(transaction) + } else { + SeqScan::from(op).execute(transaction) + } } Operator::Sort(op) => { let input = build_stream(childrens.remove(0), transaction); diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 0f4296e9..b25e48bd 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::Bound; use std::fs::OpenOptions; -use std::hash::RandomState; use std::io::{Read, Write}; use std::path::Path; use std::sync::Arc; @@ -31,13 +30,13 @@ pub struct HistogramBuilder { } pub struct HistogramLoader<'a, T: Transaction> { - cache: ShardingLruCache>, + cache: &'a ShardingLruCache>, tx: &'a T, } // Equal depth histogram #[derive(Debug, PartialEq, Serialize, Deserialize)] -pub(crate) struct Histogram { +pub struct Histogram { column_id: ColumnId, data_type: LogicalType, @@ -158,10 +157,11 @@ impl HistogramBuilder { } impl<'a, T: Transaction> HistogramLoader<'a, T> { - pub fn new(tx: &'a T) -> Result, OptimizerError> { - let cache = ShardingLruCache::new(128, 16, RandomState::new())?; - - Ok(HistogramLoader { cache, tx }) + pub fn new( + tx: &'a T, + cache: &'a ShardingLruCache>, + ) -> HistogramLoader<'a, T> { + HistogramLoader { cache, tx } } pub fn load(&self, table_name: TableName) -> Result<&Vec, OptimizerError> { diff --git a/src/optimizer/core/memo.rs b/src/optimizer/core/memo.rs index 01807db4..95499295 100644 --- a/src/optimizer/core/memo.rs +++ b/src/optimizer/core/memo.rs @@ -8,10 +8,11 @@ use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::OptimizerError; use crate::planner::operator::PhysicalOption; use crate::storage::Transaction; +use std::cmp::Ordering; #[derive(Debug, Clone)] pub struct Expression { - pub(crate) ops: Vec, + pub(crate) op: PhysicalOption, pub(crate) cost: Option, } @@ -56,20 +57,33 @@ impl Memo { Ok(Memo { groups }) } + + pub(crate) fn cheapest_physical_option(&self, group_index: usize) -> Option { + self.groups[group_index] + .exprs + .iter() + .min_by(|expr_1, expr_2| match (expr_1.cost, expr_2.cost) { + (Some(cost_1), Some(cost_2)) => cost_1.cmp(&cost_2), + (None, Some(_)) => Ordering::Greater, + (Some(_), None) => Ordering::Less, + (None, None) => Ordering::Equal, + }) + .map(|expr| expr.op.clone()) + } } #[cfg(test)] mod tests { use crate::binder::{Binder, BinderContext}; use crate::db::{Database, DatabaseError}; - use crate::optimizer::core::histogram::HistogramLoader; use crate::optimizer::core::memo::Memo; use crate::optimizer::heuristic::batch::HepBatchStrategy; use crate::optimizer::heuristic::graph::HepGraph; use crate::optimizer::heuristic::optimizer::HepOptimizer; use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::rule::normalization::NormalizationRuleImpl; - use crate::storage::Storage; + use crate::planner::operator::PhysicalOption; + use crate::storage::{Storage, Transaction}; use tempfile::TempDir; #[tokio::test] @@ -121,9 +135,19 @@ mod tests { ImplementationRuleImpl::IndexScan, ]; - let memo = Memo::new(&graph, &HistogramLoader::new(&transaction)?, &rules)?; - - println!("{:#?}", memo); + let memo = Memo::new(&graph, &transaction.histogram_loader(), &rules)?; + let best_plan = graph.to_plan(Some(&memo)); + let exprs = &memo.groups[3]; + + assert_eq!(exprs.exprs.len(), 2); + assert_eq!(exprs.exprs[0].cost, Some(1000)); + assert_eq!(exprs.exprs[0].op, PhysicalOption::SeqScan); + assert_eq!(exprs.exprs[1].cost, Some(1920)); + assert!(matches!(exprs.exprs[1].op, PhysicalOption::IndexScan(_))); + assert_eq!( + best_plan.as_ref().unwrap().childrens[0].childrens[0].childrens[0].physical_option, + Some(PhysicalOption::SeqScan) + ); Ok(()) } diff --git a/src/optimizer/core/opt_expr.rs b/src/optimizer/core/opt_expr.rs index c29e7b8e..5b7be4ea 100644 --- a/src/optimizer/core/opt_expr.rs +++ b/src/optimizer/core/opt_expr.rs @@ -43,6 +43,7 @@ impl OptExpr { LogicalPlan { operator: self.root.clone(), childrens, + physical_option: None, } } } diff --git a/src/optimizer/heuristic/graph.rs b/src/optimizer/heuristic/graph.rs index 6532c94e..523e9f6a 100644 --- a/src/optimizer/heuristic/graph.rs +++ b/src/optimizer/heuristic/graph.rs @@ -1,3 +1,4 @@ +use crate::optimizer::core::memo::Memo; use crate::optimizer::core::opt_expr::OptExprNodeId; use crate::optimizer::heuristic::batch::HepMatchOrder; use crate::planner::operator::Operator; @@ -10,7 +11,7 @@ use std::mem; /// HepNodeId is used in optimizer to identify a node. pub type HepNodeId = NodeIndex; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HepGraph { graph: StableDiGraph, root_index: HepNodeId, @@ -24,6 +25,7 @@ impl HepGraph { LogicalPlan { operator, childrens, + .. }: LogicalPlan, ) -> HepNodeId { let index = graph.add_node(operator); @@ -163,11 +165,6 @@ impl HepGraph { &mut self.graph[node_id] } - // FIXME - pub fn to_plan(&self) -> LogicalPlan { - self.to_plan_with_index(self.root_index) - } - /// If input node is join, we use the edge weight to control the join children order. pub fn children_at(&self, id: HepNodeId) -> Box + '_> { Box::new( @@ -185,27 +182,25 @@ impl HepGraph { .map(|edge| edge.target()) } - pub fn to_plan_with_index(&self, start_index: HepNodeId) -> LogicalPlan { - let mut root_plan = LogicalPlan { - operator: self.operator(start_index).clone(), - childrens: vec![], - }; - - self.build_childrens(&mut root_plan, start_index); - - root_plan + pub fn to_plan(mut self, memo: Option<&Memo>) -> Option { + self.build_childrens(self.root_index, memo) } - fn build_childrens(&self, plan: &mut LogicalPlan, start: HepNodeId) { - for child_id in self.children_at(start) { - let mut child_plan = LogicalPlan { - operator: self.operator(child_id).clone(), - childrens: vec![], - }; + fn build_childrens(&mut self, start: HepNodeId, memo: Option<&Memo>) -> Option { + let mut childrens = Vec::with_capacity(2); + let physical_option = memo.and_then(|memo| memo.cheapest_physical_option(start.index())); - self.build_childrens(&mut child_plan, child_id); - plan.childrens.push(child_plan); + for child_id in self.children_at(start).collect_vec() { + if let Some(child_plan) = self.build_childrens(child_id, memo) { + childrens.push(child_plan); + } } + + self.graph.remove_node(start).map(|operator| LogicalPlan { + operator, + childrens, + physical_option, + }) } } @@ -350,14 +345,10 @@ mod tests { let plan = select_sql_run("select * from t1 left join t2 on c1 = c3").await?; let graph = HepGraph::new(plan.clone()); - let plan_for_graph = graph.to_plan(); + let plan_for_graph = graph.to_plan(None).unwrap(); assert_eq!(plan, plan_for_graph); - let plan_by_index = graph.to_plan_with_index(HepNodeId::new(1)); - - assert_eq!(plan.childrens[0], plan_by_index); - Ok(()) } } diff --git a/src/optimizer/heuristic/matcher.rs b/src/optimizer/heuristic/matcher.rs index 56637ec5..f31d1682 100644 --- a/src/optimizer/heuristic/matcher.rs +++ b/src/optimizer/heuristic/matcher.rs @@ -102,13 +102,17 @@ mod tests { childrens: vec![LogicalPlan { operator: Operator::Dummy, childrens: vec![], + physical_option: None, }], + physical_option: None, }, LogicalPlan { operator: Operator::Dummy, childrens: vec![], + physical_option: None, }, ], + physical_option: None, }; let graph = HepGraph::new(all_dummy_plan.clone()); diff --git a/src/optimizer/heuristic/optimizer.rs b/src/optimizer/heuristic/optimizer.rs index d0a3daa0..d8edeee0 100644 --- a/src/optimizer/heuristic/optimizer.rs +++ b/src/optimizer/heuristic/optimizer.rs @@ -1,14 +1,19 @@ +use crate::optimizer::core::histogram::HistogramLoader; +use crate::optimizer::core::memo::Memo; use crate::optimizer::core::pattern::PatternMatcher; use crate::optimizer::core::rule::{MatchPattern, NormalizationRule}; use crate::optimizer::heuristic::batch::{HepBatch, HepBatchStrategy}; use crate::optimizer::heuristic::graph::{HepGraph, HepNodeId}; use crate::optimizer::heuristic::matcher::HepMatcher; +use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::optimizer::OptimizerError; use crate::planner::LogicalPlan; +use crate::storage::Transaction; pub struct HepOptimizer { batches: Vec, + memo: Option, pub graph: HepGraph, } @@ -16,6 +21,7 @@ impl HepOptimizer { pub fn new(root: LogicalPlan) -> Self { Self { batches: vec![], + memo: None, graph: HepGraph::new(root), } } @@ -30,7 +36,17 @@ impl HepOptimizer { self } - pub fn find_best(&mut self) -> Result { + pub fn build_memo( + mut self, + loader: &HistogramLoader<'_, T>, + implementations: &[ImplementationRuleImpl], + ) -> Result { + self.memo = Some(Memo::new(&self.graph, &loader, implementations)?); + + Ok(self) + } + + pub fn find_best(mut self) -> Result { let batches = self.batches.clone(); for batch in batches { @@ -46,7 +62,7 @@ impl HepOptimizer { } } - Ok(self.graph.to_plan()) + Ok(self.graph.to_plan(None).ok_or(OptimizerError::EmptyPlan)?) } fn apply_batch( diff --git a/src/optimizer/rule/implementation/dml/analyze.rs b/src/optimizer/rule/implementation/dml/analyze.rs new file mode 100644 index 00000000..35747811 --- /dev/null +++ b/src/optimizer/rule/implementation/dml/analyze.rs @@ -0,0 +1,27 @@ +use crate::optimizer::core::memo::{Expression, GroupExpression}; +use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; +use crate::optimizer::core::rule::{ImplementationRule, MatchPattern}; +use crate::optimizer::rule::implementation::HistogramLoader; +use crate::optimizer::OptimizerError; +use crate::planner::operator::{Operator, PhysicalOption}; +use crate::single_mapping; +use crate::storage::Transaction; +use lazy_static::lazy_static; + +lazy_static! { + static ref ANALYZE_PATTERN: Pattern = { + Pattern { + predicate: |op| matches!(op, Operator::Analyze(_)), + children: PatternChildrenPredicate::None, + } + }; +} + +#[derive(Clone)] +pub struct AnalyzeImplementation; + +single_mapping!( + AnalyzeImplementation, + ANALYZE_PATTERN, + PhysicalOption::Analyze +); diff --git a/src/optimizer/rule/implementation/dml/mod.rs b/src/optimizer/rule/implementation/dml/mod.rs index fac541fe..094edda3 100644 --- a/src/optimizer/rule/implementation/dml/mod.rs +++ b/src/optimizer/rule/implementation/dml/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod analyze; pub(crate) mod copy_from_file; pub(crate) mod copy_to_file; pub(crate) mod delete; diff --git a/src/optimizer/rule/implementation/dql/scan.rs b/src/optimizer/rule/implementation/dql/scan.rs index abf9d8bc..a7afac96 100644 --- a/src/optimizer/rule/implementation/dql/scan.rs +++ b/src/optimizer/rule/implementation/dql/scan.rs @@ -43,7 +43,7 @@ impl ImplementationRule for SeqScanImplementation { } group_expr.append_expr(Expression { - ops: vec![PhysicalOption::SeqScan], + op: PhysicalOption::SeqScan, cost, }); @@ -88,7 +88,7 @@ impl ImplementationRule for IndexScanImplementation { } group_expr.append_expr(Expression { - ops: vec![PhysicalOption::IndexScan(index_info.clone())], + op: PhysicalOption::IndexScan(index_info.clone()), cost, }) } diff --git a/src/optimizer/rule/implementation/marcos.rs b/src/optimizer/rule/implementation/marcos.rs index a0b3cba5..0ac76dc6 100644 --- a/src/optimizer/rule/implementation/marcos.rs +++ b/src/optimizer/rule/implementation/marcos.rs @@ -16,7 +16,7 @@ macro_rules! single_mapping { ) -> Result<(), OptimizerError> { //TODO: CostModel group_expr.append_expr(Expression { - ops: vec![$option], + op: $option, cost: None, }); diff --git a/src/optimizer/rule/implementation/mod.rs b/src/optimizer/rule/implementation/mod.rs index 426fad75..024bcd6d 100644 --- a/src/optimizer/rule/implementation/mod.rs +++ b/src/optimizer/rule/implementation/mod.rs @@ -12,6 +12,7 @@ use crate::optimizer::rule::implementation::ddl::create_table::CreateTableImplem use crate::optimizer::rule::implementation::ddl::drop_column::DropColumnImplementation; use crate::optimizer::rule::implementation::ddl::drop_table::DropTableImplementation; use crate::optimizer::rule::implementation::ddl::truncate::TruncateImplementation; +use crate::optimizer::rule::implementation::dml::analyze::AnalyzeImplementation; use crate::optimizer::rule::implementation::dml::copy_from_file::CopyFromFileImplementation; use crate::optimizer::rule::implementation::dml::copy_to_file::CopyToFileImplementation; use crate::optimizer::rule::implementation::dml::delete::DeleteImplementation; @@ -49,6 +50,7 @@ pub enum ImplementationRuleImpl { Sort, Values, // DML + Analyze, CopyFromFile, CopyToFile, Delete, @@ -86,6 +88,7 @@ impl MatchPattern for ImplementationRuleImpl { ImplementationRuleImpl::DropColumn => DropColumnImplementation.pattern(), ImplementationRuleImpl::DropTable => DropTableImplementation.pattern(), ImplementationRuleImpl::Truncate => TruncateImplementation.pattern(), + ImplementationRuleImpl::Analyze => AnalyzeImplementation.pattern(), } } } @@ -161,6 +164,9 @@ impl ImplementationRule for ImplementationRuleImpl { ImplementationRuleImpl::Truncate => { TruncateImplementation.to_expression(operator, loader, group_expr)? } + ImplementationRuleImpl::Analyze => { + AnalyzeImplementation.to_expression(operator, loader, group_expr)? + } } Ok(()) diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 6b0a5243..25106cc0 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -1,12 +1,13 @@ pub mod operator; use crate::catalog::TableName; -use crate::planner::operator::Operator; +use crate::planner::operator::{Operator, PhysicalOption}; #[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct LogicalPlan { pub operator: Operator, pub childrens: Vec, + pub physical_option: Option, } impl LogicalPlan { diff --git a/src/planner/operator/aggregate.rs b/src/planner/operator/aggregate.rs index c6ce8a9d..9017b079 100644 --- a/src/planner/operator/aggregate.rs +++ b/src/planner/operator/aggregate.rs @@ -19,6 +19,7 @@ impl AggregateOperator { agg_calls, }), childrens: vec![children], + physical_option: None, } } } diff --git a/src/planner/operator/filter.rs b/src/planner/operator/filter.rs index 41fd25aa..372e181a 100644 --- a/src/planner/operator/filter.rs +++ b/src/planner/operator/filter.rs @@ -16,6 +16,7 @@ impl FilterOperator { LogicalPlan { operator: Operator::Filter(FilterOperator { predicate, having }), childrens: vec![children], + physical_option: None, } } } diff --git a/src/planner/operator/join.rs b/src/planner/operator/join.rs index 459c147b..3f2b2122 100644 --- a/src/planner/operator/join.rs +++ b/src/planner/operator/join.rs @@ -38,6 +38,7 @@ impl JoinOperator { LogicalPlan { operator: Operator::Join(JoinOperator { on, join_type }), childrens: vec![left, right], + physical_option: None, } } } diff --git a/src/planner/operator/limit.rs b/src/planner/operator/limit.rs index c72ff1e7..9a2cb8f7 100644 --- a/src/planner/operator/limit.rs +++ b/src/planner/operator/limit.rs @@ -17,6 +17,7 @@ impl LimitOperator { LogicalPlan { operator: Operator::Limit(LimitOperator { offset, limit }), childrens: vec![children], + physical_option: None, } } } diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 1841e842..5d6a7c98 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -71,7 +71,7 @@ pub enum Operator { CopyToFile(CopyToFileOperator), } -#[derive(Debug, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub enum PhysicalOption { Dummy, SimpleAggregate, @@ -96,6 +96,7 @@ pub enum PhysicalOption { Show, CopyFromFile, CopyToFile, + Analyze, } impl Operator { diff --git a/src/planner/operator/scan.rs b/src/planner/operator/scan.rs index bd96ff82..9128c59d 100644 --- a/src/planner/operator/scan.rs +++ b/src/planner/operator/scan.rs @@ -54,6 +54,7 @@ impl ScanOperator { limit: (None, None), }), childrens: vec![], + physical_option: None, } } } diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 5469d29e..45c83ea6 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -1,5 +1,6 @@ use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; use crate::expression::simplify::ConstantBinary; +use crate::optimizer::core::histogram::{Histogram, HistogramLoader}; use crate::storage::table_codec::TableCodec; use crate::storage::{ tuple_projection, Bounds, IndexIter, Iter, Projections, Storage, StorageError, Transaction, @@ -21,6 +22,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct KipStorage { pub inner: Arc, + pub(crate) histogram_cache: Arc>>, } impl KipStorage { @@ -28,9 +30,11 @@ impl KipStorage { let storage = storage::KipStorage::open_with_config(Config::new(path).enable_level_0_memorization()) .await?; + let histogram_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); Ok(KipStorage { inner: Arc::new(storage), + histogram_cache, }) } } @@ -43,14 +47,16 @@ impl Storage for KipStorage { Ok(KipTransaction { tx, - cache: ShardingLruCache::new(8, 2, RandomState::default())?, + table_cache: ShardingLruCache::new(8, 2, RandomState::default())?, + histogram_cache: self.histogram_cache.clone(), }) } } pub struct KipTransaction { tx: mvcc::Transaction, - cache: ShardingLruCache, + table_cache: ShardingLruCache, + histogram_cache: Arc>>, } impl Transaction for KipTransaction { @@ -200,7 +206,7 @@ impl Transaction for KipTransaction { let column = catalog.get_column_by_id(&col_id).unwrap(); let (key, value) = TableCodec::encode_column(&table_name, column)?; self.tx.set(key, value); - self.cache.remove(table_name); + self.table_cache.remove(table_name); Ok(col_id) } else { @@ -235,7 +241,7 @@ impl Transaction for KipTransaction { } err => err?, } - self.cache.remove(table_name); + self.table_cache.remove(table_name); Ok(()) } else { @@ -267,7 +273,7 @@ impl Transaction for KipTransaction { let (key, value) = TableCodec::encode_column(&table_name, column)?; self.tx.set(key, value); } - self.cache.put(table_name.to_string(), table_catalog); + self.table_cache.put(table_name.to_string(), table_catalog); Ok(table_name) } @@ -291,7 +297,7 @@ impl Transaction for KipTransaction { self.tx .remove(&TableCodec::encode_root_table_key(table_name))?; - let _ = self.cache.remove(&table_name.to_string()); + let _ = self.table_cache.remove(&table_name.to_string()); Ok(()) } @@ -307,7 +313,7 @@ impl Transaction for KipTransaction { } fn table(&self, table_name: TableName) -> Option<&TableCatalog> { - let mut option = self.cache.get(&table_name); + let mut option = self.table_cache.get(&table_name); if option.is_none() { // TODO: unify the data into a `Meta` prefix and use one iteration to collect all data @@ -315,7 +321,7 @@ impl Transaction for KipTransaction { if let Ok(catalog) = TableCatalog::reload(table_name.clone(), columns, indexes) { option = self - .cache + .table_cache .get_or_insert(table_name.to_string(), |_| Ok(catalog)) .ok(); } @@ -341,6 +347,7 @@ impl Transaction for KipTransaction { } fn save_meta(&mut self, table_meta: &TableMeta) -> Result<(), StorageError> { + let _ = self.histogram_cache.remove(&table_meta.table_name); let (key, value) = TableCodec::encode_root_table(table_meta)?; self.tx.set(key, value); @@ -360,6 +367,13 @@ impl Transaction for KipTransaction { Ok(vec![]) } + fn histogram_loader(&self) -> HistogramLoader + where + Self: Sized, + { + HistogramLoader::new(self, &self.histogram_cache) + } + async fn commit(self) -> Result<(), StorageError> { self.tx.commit().await?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a04dd832..e79c9857 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,6 +4,7 @@ mod table_codec; use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog, TableMeta, TableName}; use crate::expression::simplify::ConstantBinary; use crate::expression::ScalarExpression; +use crate::optimizer::core::histogram::HistogramLoader; use crate::storage::table_codec::TableCodec; use crate::types::errors::TypeError; use crate::types::index::{Index, IndexMetaRef}; @@ -96,6 +97,9 @@ pub trait Transaction: Sync + Send + 'static { fn table_metas(&self) -> Result, StorageError>; fn save_meta(&mut self, table_meta: &TableMeta) -> Result<(), StorageError>; fn histogram_paths(&self, table_name: &str) -> Result, StorageError>; + fn histogram_loader(&self) -> HistogramLoader + where + Self: Sized; #[allow(async_fn_in_trait)] async fn commit(self) -> Result<(), StorageError>; diff --git a/tests/slt/analyze.slt b/tests/slt/analyze.slt new file mode 100644 index 00000000..79610b4b --- /dev/null +++ b/tests/slt/analyze.slt @@ -0,0 +1,48 @@ +statement ok +create table t(id int primary key, v1 bigint null, v2 varchar null, v3 decimal null) + +statement ok +insert into t values (0,1,10,100) + +statement ok +insert into t values (1,1,10,100), (2,2,20,200), (3,3,30,300), (4,4,40,400) + +statement ok +insert into t(id, v1, v2, v3) values (5,1,10,100) + +statement ok +insert into t(id, v1, v2) values (6,1,10) + +statement ok +insert into t(id, v2, v1) values (7,1,10) + +statement error +insert into t(id, v1, v2, v3) values (0) + +statement error +insert into t(id, v1, v2, v3) values (0, 0) + +statement error +insert into t(id, v1, v2, v3) values (0, 0, 0) + +statement ok +insert into t values (8,NULL,NULL,NULL) + +query IIII rowsort +select * from t +---- +0 1 10 100 +1 1 10 100 +2 2 20 200 +3 3 30 300 +4 4 40 400 +5 1 10 100 +6 1 10 null +7 10 1 null +8 null null null + +statement ok +analyze table t + +statement ok +drop table t \ No newline at end of file