Skip to content

Commit

Permalink
feat: completed the integration of CBO into the optimizer(currently o…
Browse files Browse the repository at this point in the history
…nly single column index selection is supported)
  • Loading branch information
KKould committed Jan 26, 2024
1 parent 738810a commit b7222af
Show file tree
Hide file tree
Showing 36 changed files with 285 additions and 89 deletions.
2 changes: 2 additions & 0 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
column,
}),
childrens: vec![plan],
physical_option: None,
}
}
AlterTableOperation::DropColumn {
Expand All @@ -58,6 +59,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
column_name,
}),
childrens: vec![plan],
physical_option: None,
}
}
AlterTableOperation::DropPrimaryKey => todo!(),
Expand Down
1 change: 1 addition & 0 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
columns,
}),
childrens: vec![scan_op],
physical_option: None,
};
Ok(plan)
}
Expand Down
2 changes: 2 additions & 0 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(LogicalPlan {
operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
childrens: vec![],
physical_option: None,
})
} else {
// COPY <dest_table> FROM <source_file>
Expand All @@ -95,6 +96,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
table: table_name.to_string(),
}),
childrens: vec![],
physical_option: None,
})
}
} else {
Expand Down
1 change: 1 addition & 0 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
if_not_exists,
}),
childrens: vec![],
physical_option: None,
};
Ok(plan)
}
Expand Down
1 change: 1 addition & 0 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
primary_key_column,
}),
childrens: vec![plan],
physical_option: None,
})
} else {
unreachable!("only table")
Expand Down
1 change: 1 addition & 0 deletions src/binder/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
if_exists: *if_exists,
}),
childrens: vec![],
physical_option: None,
};
Ok(plan)
}
Expand Down
2 changes: 2 additions & 0 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
is_overwrite,
}),
childrens: vec![values_plan],
physical_option: None,
})
} else {
Err(BindError::InvalidTable(format!(
Expand All @@ -103,6 +104,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
LogicalPlan {
operator: Operator::Values(ValuesOperator { rows, columns }),
childrens: vec![],
physical_option: None,
}
}
}
3 changes: 3 additions & 0 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
return Ok(LogicalPlan {
operator: Operator::Dummy,
childrens: vec![],
physical_option: None,
});
}

Expand Down Expand Up @@ -340,6 +341,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
LogicalPlan {
operator: Operator::Project(ProjectOperator { exprs: select_list }),
childrens: vec![children],
physical_option: None,
}
}

Expand All @@ -350,6 +352,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
limit: None,
}),
childrens: vec![children],
physical_option: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/binder/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
let plan = LogicalPlan {
operator: Operator::Show(ShowTablesOperator {}),
childrens: vec![],
physical_option: None,
};
Ok(plan)
}
Expand Down
1 change: 1 addition & 0 deletions src/binder/truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
let plan = LogicalPlan {
operator: Operator::Truncate(TruncateOperator { table_name }),
childrens: vec![],
physical_option: None,
};
Ok(plan)
}
Expand Down
1 change: 1 addition & 0 deletions src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Ok(LogicalPlan {
operator: Operator::Update(UpdateOperator { table_name }),
childrens: vec![plan, values_plan],
physical_option: None,
})
} else {
unreachable!("only table")
Expand Down
51 changes: 44 additions & 7 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use std::path::PathBuf;
use crate::binder::{BindError, Binder, BinderContext};
use crate::execution::volcano::{build_stream, try_collect};
use crate::execution::ExecutorError;
use crate::optimizer::core::histogram::HistogramLoader;
use crate::optimizer::heuristic::batch::HepBatchStrategy;
use crate::optimizer::heuristic::optimizer::HepOptimizer;
use crate::optimizer::rule::implementation::ImplementationRuleImpl;
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
use crate::optimizer::OptimizerError;
use crate::parser::parse_sql;
Expand Down Expand Up @@ -81,7 +83,7 @@ impl<S: Storage> Database<S> {
/// Run SQL queries.
pub async fn run<T: AsRef<str>>(&self, sql: T) -> Result<Vec<Tuple>, DatabaseError> {
let transaction = self.storage.transaction().await?;
let (plan, _) = Self::build_plan(sql, &transaction)?;
let (plan, _) = Self::build_plan::<T, S::TransactionType>(sql, &transaction)?;

Self::run_volcano(transaction, plan).await
}
Expand All @@ -107,8 +109,8 @@ impl<S: Storage> Database<S> {
})
}

pub fn build_plan<T: AsRef<str>>(
sql: T,
pub fn build_plan<V: AsRef<str>, T: Transaction>(
sql: V,
transaction: &<S as Storage>::TransactionType,
) -> Result<(LogicalPlan, Statement), DatabaseError> {
// parse
Expand All @@ -127,13 +129,17 @@ impl<S: Storage> Database<S> {
let source_plan = binder.bind(&stmts[0])?;
// println!("source_plan plan: {:#?}", source_plan);

let best_plan = Self::default_optimizer(source_plan).find_best()?;
let best_plan =
Self::default_optimizer(source_plan, &transaction.histogram_loader())?.find_best()?;
// println!("best_plan plan: {:#?}", best_plan);

Ok((best_plan, stmts.remove(0)))
}

pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer {
pub(crate) fn default_optimizer<T: Transaction>(
source_plan: LogicalPlan,
loader: &HistogramLoader<'_, T>,
) -> Result<HepOptimizer, OptimizerError> {
HepOptimizer::new(source_plan)
.batch(
"Column Pruning".to_string(),
Expand Down Expand Up @@ -174,6 +180,36 @@ impl<S: Storage> Database<S> {
NormalizationRuleImpl::EliminateLimits,
],
)
.build_memo(
loader,
&[
// DQL
ImplementationRuleImpl::SimpleAggregate,
ImplementationRuleImpl::GroupByAggregate,
ImplementationRuleImpl::Dummy,
ImplementationRuleImpl::Filter,
ImplementationRuleImpl::HashJoin,
ImplementationRuleImpl::Limit,
ImplementationRuleImpl::Projection,
ImplementationRuleImpl::SeqScan,
ImplementationRuleImpl::IndexScan,
ImplementationRuleImpl::Sort,
ImplementationRuleImpl::Values,
// DML
ImplementationRuleImpl::Analyze,
ImplementationRuleImpl::CopyFromFile,
ImplementationRuleImpl::CopyToFile,
ImplementationRuleImpl::Delete,
ImplementationRuleImpl::Insert,
ImplementationRuleImpl::Update,
// DLL
ImplementationRuleImpl::AddColumn,
ImplementationRuleImpl::CreateTable,
ImplementationRuleImpl::DropColumn,
ImplementationRuleImpl::DropTable,
ImplementationRuleImpl::Truncate,
],
)
}
}

Expand All @@ -183,8 +219,9 @@ pub struct DBTransaction<S: Storage> {

impl<S: Storage> DBTransaction<S> {
pub async fn run<T: AsRef<str>>(&mut self, sql: T) -> Result<Vec<Tuple>, DatabaseError> {
let (plan, _) =
Database::<S>::build_plan(sql, unsafe { self.inner.as_ptr().as_ref().unwrap() })?;
let (plan, _) = Database::<S>::build_plan::<T, S::TransactionType>(sql, unsafe {
self.inner.as_ptr().as_ref().unwrap()
})?;
let mut stream = build_stream(plan, &self.inner);

Ok(try_collect(&mut stream).await?)
Expand Down
9 changes: 7 additions & 2 deletions src/execution/volcano/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::catalog::{ColumnCatalog, ColumnRef, TableMeta, TableName};
use crate::execution::volcano::{BoxedExecutor, Executor};
use crate::execution::ExecutorError;
use crate::optimizer::core::histogram::HistogramBuilder;
use crate::optimizer::OptimizerError;
use crate::planner::operator::analyze::AnalyzeOperator;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
Expand Down Expand Up @@ -89,11 +90,15 @@ impl Analyze {
.join(ts.to_string());
fs::create_dir_all(&dir_path)?;

let mut meta = TableMeta::empty(table_name);
let mut meta = TableMeta::empty(table_name.clone());

for (column_id, builder) in builders {
let path = dir_path.join(column_id.unwrap().to_string());
let histogram = builder.build(DEFAULT_NUM_OF_BUCKETS)?;
let histogram = match builder.build(DEFAULT_NUM_OF_BUCKETS) {
Ok(histogram) => histogram,
Err(OptimizerError::TooManyBuckets) => continue,
err => err?,
};

histogram.to_file(&path)?;

Expand Down
26 changes: 10 additions & 16 deletions src/execution/volcano/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ use crate::execution::volcano::{BoxedExecutor, Executor};
use crate::execution::ExecutorError;
use crate::planner::operator::scan::ScanOperator;
use crate::storage::{Iter, Transaction};
use crate::types::index::IndexInfo;
use crate::types::index::IndexMetaRef;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;
use std::cell::RefCell;
use crate::expression::simplify::ConstantBinary;

pub(crate) struct IndexScan {
op: ScanOperator,
index_by: IndexMetaRef,
binaries: Vec<ConstantBinary>,
}

impl From<ScanOperator> for IndexScan {
fn from(op: ScanOperator) -> Self {
IndexScan { op }
impl From<(ScanOperator, IndexMetaRef, Vec<ConstantBinary>)> for IndexScan {
fn from((op, index_by, binaries): (ScanOperator, IndexMetaRef, Vec<ConstantBinary>)) -> Self {
IndexScan { op, index_by, binaries }
}
}

Expand All @@ -30,21 +33,12 @@ impl IndexScan {
table_name,
columns,
limit,
mut index_infos,
..
} = self.op;
if let Some(IndexInfo {
meta,
binaries: Some(binaries),
}) = index_infos.pop()
{
let mut iter = transaction.read_by_index(table_name, limit, columns, meta, binaries)?;
let mut iter = transaction.read_by_index(table_name, limit, columns, self.index_by, self.binaries)?;

while let Some(tuple) = iter.next_tuple()? {
yield tuple;
}
} else {
return Err(ExecutorError::InvalidIndex);
while let Some(tuple) = iter.next_tuple()? {
yield tuple;
}
}
}
17 changes: 9 additions & 8 deletions src/execution/volcano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use crate::execution::volcano::dql::sort::Sort;
use crate::execution::volcano::dql::values::Values;
use crate::execution::volcano::show::show_table::ShowTables;
use crate::execution::ExecutorError;
use crate::planner::operator::Operator;
use crate::planner::operator::{Operator, PhysicalOption};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use std::cell::RefCell;
use crate::execution::volcano::dql::index_scan::IndexScan;
use crate::types::index::IndexInfo;

use self::ddl::add_column::AddColumn;

Expand All @@ -44,6 +46,7 @@ pub fn build_stream<T: Transaction>(plan: LogicalPlan, transaction: &RefCell<T>)
let LogicalPlan {
operator,
mut childrens,
..
} = plan;

match operator {
Expand Down Expand Up @@ -74,13 +77,11 @@ pub fn build_stream<T: Transaction>(plan: LogicalPlan, transaction: &RefCell<T>)
Projection::from((op, input)).execute(transaction)
}
Operator::Scan(op) => {
SeqScan::from(op).execute(transaction)
// Fixme
// if op.index_infos.is_empty() {
// SeqScan::from(op).execute(transaction)
// } else {
// IndexScan::from(op).execute(transaction)
// }
if let Some(PhysicalOption::IndexScan(IndexInfo { meta, binaries: Some(binaries) })) = plan.physical_option {
IndexScan::from((op, meta, binaries)).execute(transaction)
} else {
SeqScan::from(op).execute(transaction)
}
}
Operator::Sort(op) => {
let input = build_stream(childrens.remove(0), transaction);
Expand Down
14 changes: 7 additions & 7 deletions src/optimizer/core/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::Bound;
use std::fs::OpenOptions;
use std::hash::RandomState;
use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;
Expand All @@ -31,13 +30,13 @@ pub struct HistogramBuilder {
}

pub struct HistogramLoader<'a, T: Transaction> {
cache: ShardingLruCache<TableName, Vec<Histogram>>,
cache: &'a ShardingLruCache<TableName, Vec<Histogram>>,
tx: &'a T,
}

// Equal depth histogram
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct Histogram {
pub struct Histogram {
column_id: ColumnId,
data_type: LogicalType,

Expand Down Expand Up @@ -158,10 +157,11 @@ impl HistogramBuilder {
}

impl<'a, T: Transaction> HistogramLoader<'a, T> {
pub fn new(tx: &'a T) -> Result<HistogramLoader<T>, OptimizerError> {
let cache = ShardingLruCache::new(128, 16, RandomState::new())?;

Ok(HistogramLoader { cache, tx })
pub fn new(
tx: &'a T,
cache: &'a ShardingLruCache<TableName, Vec<Histogram>>,
) -> HistogramLoader<'a, T> {
HistogramLoader { cache, tx }
}

pub fn load(&self, table_name: TableName) -> Result<&Vec<Histogram>, OptimizerError> {
Expand Down
Loading

0 comments on commit b7222af

Please sign in to comment.