Skip to content

Commit

Permalink
feat: impl Explain
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jan 31, 2024
1 parent ad7cbb5 commit 22eb0b4
Show file tree
Hide file tree
Showing 51 changed files with 869 additions and 55 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ Cargo.lock
/transaction

fncksql_bench
sqlite_bench
sqlite_bench

tests/data/row_20000.csv
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.1-alpha.9"
version = "0.0.1-alpha.10"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "Fast Insert OLTP SQL DBMS"
Expand Down
2 changes: 1 addition & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ mod tests {
let transaction = storage.transaction().await?;

let sql = "create table t1 (id int primary key, name varchar(10) null)";
let binder = Binder::new(BinderContext::new(&transaction));
let mut binder = Binder::new(BinderContext::new(&transaction));
let stmt = crate::parser::parse_sql(sql).unwrap();
let plan1 = binder.bind(&stmt[0]).unwrap();

Expand Down
14 changes: 14 additions & 0 deletions src/binder/explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::binder::{BindError, Binder};
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result<LogicalPlan, BindError> {
Ok(LogicalPlan {
operator: Operator::Explain,
childrens: vec![plan],
physical_option: None,
})
}
}
10 changes: 8 additions & 2 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod create_table;
mod delete;
mod distinct;
mod drop_table;
mod explain;
pub mod expr;
mod insert;
mod select;
Expand Down Expand Up @@ -118,7 +119,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Binder { context }
}

pub fn bind(mut self, stmt: &Statement) -> Result<LogicalPlan, BindError> {
pub fn bind(&mut self, stmt: &Statement) -> Result<LogicalPlan, BindError> {
let plan = match stmt {
Statement::Query(query) => self.bind_query(query)?,
Statement::AlterTable { name, operation } => self.bind_alter_table(name, operation)?,
Expand Down Expand Up @@ -184,6 +185,11 @@ impl<'a, T: Transaction> Binder<'a, T> {
options,
..
} => self.bind_copy(source.clone(), *to, target.clone(), options)?,
Statement::Explain { statement, .. } => {
let plan = self.bind(statement)?;

self.bind_explain(plan)?
}
_ => return Err(BindError::UnsupportedStmt(stmt.to_string())),
};
Ok(plan)
Expand Down Expand Up @@ -308,7 +314,7 @@ pub mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let storage = build_test_catalog(temp_dir.path()).await?;
let transaction = storage.transaction().await?;
let binder = Binder::new(BinderContext::new(&transaction));
let mut binder = Binder::new(BinderContext::new(&transaction));
let stmt = crate::parser::parse_sql(sql)?;

Ok(binder.bind(&stmt[0])?)
Expand Down
3 changes: 1 addition & 2 deletions src/binder/show.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::binder::{BindError, Binder};
use crate::planner::operator::show::ShowTablesOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
pub(crate) fn bind_show_tables(&mut self) -> Result<LogicalPlan, BindError> {
let plan = LogicalPlan {
operator: Operator::Show(ShowTablesOperator {}),
operator: Operator::Show,
childrens: vec![],
physical_option: None,
};
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl<S: Storage> Database<S> {
if stmts.is_empty() {
return Err(DatabaseError::EmptyStatement);
}
let binder = Binder::new(BinderContext::new(transaction));
let mut binder = Binder::new(BinderContext::new(transaction));
/// Build a logical plan.
///
/// SELECT a,b FROM t1 ORDER BY a LIMIT 1;
Expand Down
2 changes: 1 addition & 1 deletion src/execution/codegen/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl KipChannelIndexNext {

let ScanOperator {
table_name,
columns,
projection_columns: columns,
limit,
index_by,
..
Expand Down
2 changes: 1 addition & 1 deletion src/execution/codegen/dql/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl KipChannelSeqNext {

let ScanOperator {
table_name,
columns,
projection_columns: columns,
limit,
..
} = op;
Expand Down
4 changes: 2 additions & 2 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ pub enum ExecutorError {
),
#[error("Internal error: {0}")]
InternalError(String),
#[error("io error")]
#[error("io error: {0}")]
Io(
#[from]
#[source]
std::io::Error,
),
#[error("csv error")]
#[error("csv error: {0}")]
Csv(
#[from]
#[source]
Expand Down
17 changes: 16 additions & 1 deletion src/execution/volcano/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use crate::types::value::DataValue;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::collections::HashMap;
use std::fs;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{fmt, fs};

const DEFAULT_NUM_OF_BUCKETS: usize = 100;
const DEFAULT_COLUMN_METAS_PATH: &str = "fnck_sql_column_metas";
Expand Down Expand Up @@ -123,3 +124,17 @@ impl Analyze {
};
}
}

impl fmt::Display for AnalyzeOperator {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let columns = self
.columns
.iter()
.map(|column| format!("{}", column.name()))
.join(", ");

write!(f, "Analyze {} -> [{}]", self.table_name, columns)?;

Ok(())
}
}
41 changes: 41 additions & 0 deletions src/execution/volcano/dql/explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::catalog::ColumnCatalog;
use crate::catalog::ColumnRef;
use crate::execution::volcano::{BoxedExecutor, ReadExecutor};
use crate::execution::ExecutorError;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use crate::types::value::ValueRef;
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct Explain {
plan: LogicalPlan,
}

impl From<LogicalPlan> for Explain {
fn from(plan: LogicalPlan) -> Self {
Explain { plan }
}
}

impl<T: Transaction> ReadExecutor<T> for Explain {
fn execute(self, _: &T) -> BoxedExecutor {
self._execute()
}
}

impl Explain {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute(self) {
let columns: Vec<ColumnRef> = vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))];
let values: Vec<ValueRef> = vec![Arc::new(DataValue::Utf8(Some(self.plan.explain(0))))];

yield Tuple {
id: None,
columns,
values,
};
}
}
2 changes: 1 addition & 1 deletion src/execution/volcano/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl IndexScan {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let ScanOperator {
table_name,
columns,
projection_columns: columns,
limit,
..
} = self.op;
Expand Down
2 changes: 2 additions & 0 deletions src/execution/volcano/dql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
pub(crate) mod aggregate;
pub(crate) mod dummy;
pub(crate) mod explain;
pub(crate) mod filter;
pub(crate) mod index_scan;
pub(crate) mod join;
pub(crate) mod limit;
pub(crate) mod projection;
pub(crate) mod seq_scan;
pub(crate) mod show_table;
pub(crate) mod sort;
pub(crate) mod values;

Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dql/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl SeqScan {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let ScanOperator {
table_name,
columns,
projection_columns: columns,
limit,
..
} = self.op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,13 @@ use crate::catalog::ColumnRef;
use crate::catalog::{ColumnCatalog, TableMeta};
use crate::execution::volcano::{BoxedExecutor, ReadExecutor};
use crate::execution::ExecutorError;
use crate::planner::operator::show::ShowTablesOperator;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::value::{DataValue, ValueRef};
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct ShowTables {
_op: ShowTablesOperator,
}

impl From<ShowTablesOperator> for ShowTables {
fn from(op: ShowTablesOperator) -> Self {
ShowTables { _op: op }
}
}
pub struct ShowTables;

impl<T: Transaction> ReadExecutor<T> for ShowTables {
fn execute(self, transaction: &T) -> BoxedExecutor {
Expand Down
11 changes: 8 additions & 3 deletions src/execution/volcano/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub(crate) mod ddl;
pub(crate) mod dml;
pub(crate) mod dql;
pub(crate) mod show;

use crate::execution::volcano::ddl::create_table::CreateTable;
use crate::execution::volcano::ddl::drop_column::DropColumn;
Expand All @@ -15,15 +14,16 @@ use crate::execution::volcano::dml::update::Update;
use crate::execution::volcano::dql::aggregate::hash_agg::HashAggExecutor;
use crate::execution::volcano::dql::aggregate::simple_agg::SimpleAggExecutor;
use crate::execution::volcano::dql::dummy::Dummy;
use crate::execution::volcano::dql::explain::Explain;
use crate::execution::volcano::dql::filter::Filter;
use crate::execution::volcano::dql::index_scan::IndexScan;
use crate::execution::volcano::dql::join::hash_join::HashJoin;
use crate::execution::volcano::dql::limit::Limit;
use crate::execution::volcano::dql::projection::Projection;
use crate::execution::volcano::dql::seq_scan::SeqScan;
use crate::execution::volcano::dql::show_table::ShowTables;
use crate::execution::volcano::dql::sort::Sort;
use crate::execution::volcano::dql::values::Values;
use crate::execution::volcano::show::show_table::ShowTables;
use crate::execution::ExecutorError;
use crate::planner::operator::{Operator, PhysicalOption};
use crate::planner::LogicalPlan;
Expand Down Expand Up @@ -101,7 +101,12 @@ pub fn build_read<T: Transaction>(plan: LogicalPlan, transaction: &T) -> BoxedEx
Limit::from((op, input)).execute(transaction)
}
Operator::Values(op) => Values::from(op).execute(transaction),
Operator::Show(op) => ShowTables::from(op).execute(transaction),
Operator::Show => ShowTables.execute(transaction),
Operator::Explain => {
let input = childrens.remove(0);

Explain::from(input).execute(transaction)
}
_ => unreachable!(),
}
}
Expand Down
1 change: 0 additions & 1 deletion src/execution/volcano/show/mod.rs

This file was deleted.

6 changes: 6 additions & 0 deletions src/expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ pub enum BinaryOperator {
Xor,
}

impl fmt::Display for ScalarExpression {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.output_column().name())
}
}

impl fmt::Display for BinaryOperator {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Expand Down
40 changes: 38 additions & 2 deletions src/expression/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use ahash::RandomState;
use itertools::Itertools;
use std::cmp::Ordering;
use std::collections::{Bound, HashSet};
use std::mem;
use std::fmt::Formatter;
use std::sync::Arc;
use std::{fmt, mem};

#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub enum ConstantBinary {
Expand Down Expand Up @@ -377,6 +378,13 @@ impl ConstantBinary {
.chain(eqs.into_iter().map(|val| ConstantBinary::Eq(val.clone())))
.collect_vec()
}

fn join_write(f: &mut Formatter, binaries: &Vec<ConstantBinary>, op: &str) -> fmt::Result {
let binaries = binaries.iter().map(|binary| format!("{}", binary)).join(op);
write!(f, " {} ", binaries)?;

Ok(())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -871,7 +879,7 @@ impl ScalarExpression {
| ScalarExpression::In { expr, .. } => expr.convert_binary(col_id),
ScalarExpression::IsNull { expr, negated, .. } => match expr.as_ref() {
ScalarExpression::ColumnRef(column) => {
Ok((column.id() == column.id()).then(|| {
Ok(column.id().is_some_and(|id| col_id == &id).then(|| {
if *negated {
ConstantBinary::NotEq(NULL_VALUE.clone())
} else {
Expand Down Expand Up @@ -954,6 +962,34 @@ impl ScalarExpression {
}
}

impl fmt::Display for ConstantBinary {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
ConstantBinary::Scope { min, max } => {
match min {
Bound::Unbounded => write!(f, "-∞")?,
Bound::Included(value) => write!(f, "[{}", value)?,
Bound::Excluded(value) => write!(f, "({}", value)?,
}

write!(f, ", ")?;

match max {
Bound::Unbounded => write!(f, "+∞")?,
Bound::Included(value) => write!(f, "{}]", value)?,
Bound::Excluded(value) => write!(f, "{})", value)?,
}

Ok(())
}
ConstantBinary::Eq(value) => write!(f, "{}", value),
ConstantBinary::NotEq(value) => write!(f, "!{}", value),
ConstantBinary::And(binaries) => Self::join_write(f, binaries, " AND "),
ConstantBinary::Or(binaries) => Self::join_write(f, binaries, " OR "),
}
}
}

#[cfg(test)]
mod test {
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnSummary};
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/core/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ mod tests {
database.run("analyze table t1").await?;

let transaction = database.storage.transaction().await?;
let binder = Binder::new(BinderContext::new(&transaction));
let mut binder = Binder::new(BinderContext::new(&transaction));
let stmt = crate::parser::parse_sql(
// FIXME: Only by bracketing (c1 > 40 or c1 = 2) can the filter be pushed down below the join
"select c1, c3 from t1 inner join t2 on c1 = c3 where (c1 > 40 or c1 = 2) and c3 > 22",
Expand Down
Loading

0 comments on commit 22eb0b4

Please sign in to comment.