From a7549690a627108c730dae7bef2cd65c1ecf8c47 Mon Sep 17 00:00:00 2001 From: Kould Date: Sun, 3 Nov 2024 01:55:36 +0800 Subject: [PATCH] feat: impl `View` as data source --- rust-toolchain | 2 +- src/binder/aggregate.rs | 2 +- src/binder/alter_table.rs | 4 +- src/binder/analyze.rs | 20 +- src/binder/copy.rs | 4 +- src/binder/create_index.rs | 14 +- src/binder/create_table.rs | 4 +- src/binder/create_view.rs | 2 +- src/binder/delete.rs | 21 +- src/binder/describe.rs | 2 +- src/binder/distinct.rs | 2 +- src/binder/drop_table.rs | 7 +- src/binder/explain.rs | 2 +- src/binder/expr.rs | 82 ++++--- src/binder/insert.rs | 10 +- src/binder/mod.rs | 151 ++++++++++-- src/binder/select.rs | 90 ++++--- src/binder/show.rs | 2 +- src/binder/truncate.rs | 2 +- src/binder/update.rs | 2 +- src/catalog/table.rs | 1 + src/db.rs | 21 +- src/errors.rs | 4 +- src/execution/ddl/add_column.rs | 7 +- src/execution/ddl/create_index.rs | 4 +- src/execution/ddl/create_table.rs | 4 +- src/execution/ddl/create_view.rs | 12 +- src/execution/ddl/drop_column.rs | 6 +- src/execution/ddl/drop_table.rs | 4 +- src/execution/ddl/truncate.rs | 4 +- src/execution/dml/analyze.rs | 11 +- src/execution/dml/copy_from_file.rs | 10 +- src/execution/dml/delete.rs | 7 +- src/execution/dml/insert.rs | 9 +- src/execution/dml/update.rs | 7 +- src/execution/dql/aggregate/hash_agg.rs | 7 +- src/execution/dql/aggregate/simple_agg.rs | 4 +- src/execution/dql/describe.rs | 7 +- src/execution/dql/dummy.rs | 8 +- src/execution/dql/explain.rs | 8 +- src/execution/dql/filter.rs | 4 +- src/execution/dql/function_scan.rs | 8 +- src/execution/dql/index_scan.rs | 4 +- src/execution/dql/join/hash_join.rs | 33 ++- src/execution/dql/join/nested_loop_join.rs | 37 ++- src/execution/dql/limit.rs | 4 +- src/execution/dql/projection.rs | 4 +- src/execution/dql/seq_scan.rs | 4 +- src/execution/dql/show_table.rs | 4 +- src/execution/dql/sort.rs | 4 +- src/execution/dql/union.rs | 4 +- src/execution/dql/values.rs | 8 +- src/execution/mod.rs | 12 +- src/expression/evaluator.rs | 12 +- src/expression/mod.rs | 2 +- src/function/numbers.rs | 2 +- src/lib.rs | 1 - src/optimizer/core/memo.rs | 3 +- .../rule/normalization/pushdown_limit.rs | 2 +- src/planner/mod.rs | 227 ++++++++++-------- src/planner/operator/drop_table.rs | 6 +- src/serdes/column.rs | 4 +- src/storage/mod.rs | 92 ++++--- src/storage/rocksdb.rs | 6 +- src/types/value.rs | 6 +- src/utils/lru.rs | 4 +- 66 files changed, 659 insertions(+), 408 deletions(-) diff --git a/rust-toolchain b/rust-toolchain index 7df939e8..4e42420d 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2024-04-27 \ No newline at end of file +nightly-2024-10-10 \ No newline at end of file diff --git a/src/binder/aggregate.rs b/src/binder/aggregate.rs index f333a0c2..3cd1e63a 100644 --- a/src/binder/aggregate.rs +++ b/src/binder/aggregate.rs @@ -14,7 +14,7 @@ use crate::{ use super::{Binder, QueryBindStep}; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub fn bind_aggregate( &mut self, children: LogicalPlan, diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs index 798bf963..f30a5063 100644 --- a/src/binder/alter_table.rs +++ b/src/binder/alter_table.rs @@ -12,7 +12,7 @@ use crate::planner::operator::Operator; use crate::planner::LogicalPlan; use crate::storage::Transaction; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_alter_table( &mut self, name: &ObjectName, @@ -21,7 +21,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { let table_name: Arc = Arc::new(lower_case_name(name)?); let table = self .context - .table(table_name.clone()) + .table(table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; let plan = match operation { AlterTableOperation::AddColumn { diff --git a/src/binder/analyze.rs b/src/binder/analyze.rs index 321e0559..3511fa28 100644 --- a/src/binder/analyze.rs +++ b/src/binder/analyze.rs @@ -1,4 +1,4 @@ -use crate::binder::{lower_case_name, Binder}; +use crate::binder::{lower_case_name, Binder, Source}; use crate::errors::DatabaseError; use crate::planner::operator::analyze::AnalyzeOperator; use crate::planner::operator::table_scan::TableScanOperator; @@ -8,16 +8,24 @@ use crate::storage::Transaction; use sqlparser::ast::ObjectName; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result { let table_name = Arc::new(lower_case_name(name)?); - let table_catalog = self + let table = self .context - .table_and_bind(table_name.clone(), None, None)?; - let index_metas = table_catalog.indexes.clone(); + .source_and_bind(table_name.clone(), None, None, true)? + .and_then(|source| { + if let Source::Table(table) = source { + Some(table) + } else { + None + } + }) + .ok_or(DatabaseError::TableNotFound)?; + let index_metas = table.indexes.clone(); - let scan_op = TableScanOperator::build(table_name.clone(), table_catalog); + let scan_op = TableScanOperator::build(table_name.clone(), table); Ok(LogicalPlan::new( Operator::Analyze(AnalyzeOperator { table_name, diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 6192565a..1ea6caf4 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -62,7 +62,7 @@ impl FromStr for ExtSource { } } -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(super) fn bind_copy( &mut self, source: CopySource, @@ -80,7 +80,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { } }; - if let Some(table) = self.context.table(Arc::new(table_name.to_string())) { + if let Some(table) = self.context.table(Arc::new(table_name.to_string()))? { let schema_ref = table.schema_ref().clone(); let ext_source = ExtSource { path: match target { diff --git a/src/binder/create_index.rs b/src/binder/create_index.rs index 7ef0da7e..65a04b32 100644 --- a/src/binder/create_index.rs +++ b/src/binder/create_index.rs @@ -1,4 +1,4 @@ -use crate::binder::{lower_case_name, Binder}; +use crate::binder::{lower_case_name, Binder, Source}; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::create_index::CreateIndexOperator; @@ -10,7 +10,7 @@ use crate::types::index::IndexType; use sqlparser::ast::{ObjectName, OrderByExpr}; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_create_index( &mut self, table_name: &ObjectName, @@ -29,10 +29,14 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { IndexType::Composite }; - let table = self + let source = self .context - .table_and_bind(table_name.clone(), None, None)?; - let plan = TableScanOperator::build(table_name.clone(), table); + .source_and_bind(table_name.clone(), None, None, false)? + .ok_or(DatabaseError::SourceNotFound)?; + let plan = match source { + Source::Table(table) => TableScanOperator::build(table_name.clone(), table), + Source::View(view) => LogicalPlan::clone(&view.plan), + }; let mut columns = Vec::with_capacity(exprs.len()); for expr in exprs { diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 938e0633..44c9ec8b 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -14,7 +14,7 @@ use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::LogicalType; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { // TODO: TableConstraint pub(crate) fn bind_create_table( &mut self, @@ -158,6 +158,7 @@ mod tests { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let scala_functions = Default::default(); let table_functions = Default::default(); @@ -165,6 +166,7 @@ mod tests { let mut binder = Binder::new( BinderContext::new( &table_cache, + &view_cache, &transaction, &scala_functions, &table_functions, diff --git a/src/binder/create_view.rs b/src/binder/create_view.rs index b663247d..1531d1fa 100644 --- a/src/binder/create_view.rs +++ b/src/binder/create_view.rs @@ -12,7 +12,7 @@ use sqlparser::ast::{Ident, ObjectName, Query}; use std::sync::Arc; use ulid::Ulid; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_create_view( &mut self, or_replace: &bool, diff --git a/src/binder/delete.rs b/src/binder/delete.rs index 827eceae..192e21e6 100644 --- a/src/binder/delete.rs +++ b/src/binder/delete.rs @@ -1,4 +1,4 @@ -use crate::binder::{lower_case_name, Binder}; +use crate::binder::{lower_case_name, Binder, Source}; use crate::errors::DatabaseError; use crate::planner::operator::delete::DeleteOperator; use crate::planner::operator::table_scan::TableScanOperator; @@ -8,7 +8,7 @@ use crate::storage::Transaction; use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins}; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_delete( &mut self, from: &TableWithJoins, @@ -23,15 +23,20 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { table_alias = Some(Arc::new(name.value.to_lowercase())); alias_idents = Some(columns); } - let table_catalog = - self.context - .table_and_bind(table_name.clone(), table_alias.clone(), None)?; - let primary_key_column = table_catalog - .columns() + let source = self + .context + .source_and_bind(table_name.clone(), table_alias.as_ref(), None, false)? + .ok_or(DatabaseError::SourceNotFound)?; + let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default(); + let primary_key_column = source + .columns(schema_buf) .find(|column| column.desc().is_primary) .cloned() .unwrap(); - let mut plan = TableScanOperator::build(table_name.clone(), table_catalog); + let mut plan = match source { + Source::Table(table) => TableScanOperator::build(table_name.clone(), table), + Source::View(view) => LogicalPlan::clone(&view.plan), + }; if let Some(alias_idents) = alias_idents { plan = diff --git a/src/binder/describe.rs b/src/binder/describe.rs index 159d4ad1..2a99791b 100644 --- a/src/binder/describe.rs +++ b/src/binder/describe.rs @@ -7,7 +7,7 @@ use crate::storage::Transaction; use sqlparser::ast::ObjectName; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_describe( &mut self, name: &ObjectName, diff --git a/src/binder/distinct.rs b/src/binder/distinct.rs index 2d03989d..fa88d1eb 100644 --- a/src/binder/distinct.rs +++ b/src/binder/distinct.rs @@ -4,7 +4,7 @@ use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; use crate::storage::Transaction; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub fn bind_distinct( &mut self, children: LogicalPlan, diff --git a/src/binder/drop_table.rs b/src/binder/drop_table.rs index 39dc19dd..ec31dfce 100644 --- a/src/binder/drop_table.rs +++ b/src/binder/drop_table.rs @@ -7,7 +7,7 @@ use crate::storage::Transaction; use sqlparser::ast::ObjectName; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_drop_table( &mut self, name: &ObjectName, @@ -15,13 +15,12 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { ) -> Result { let table_name = Arc::new(lower_case_name(name)?); - let plan = LogicalPlan::new( + Ok(LogicalPlan::new( Operator::DropTable(DropTableOperator { table_name, if_exists: *if_exists, }), vec![], - ); - Ok(plan) + )) } } diff --git a/src/binder/explain.rs b/src/binder/explain.rs index 3fe29c32..8620e1cd 100644 --- a/src/binder/explain.rs +++ b/src/binder/explain.rs @@ -4,7 +4,7 @@ use crate::planner::operator::Operator; use crate::planner::LogicalPlan; use crate::storage::Transaction; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result { Ok(LogicalPlan::new(Operator::Explain, vec![plan])) } diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 1e102825..e8f67f2e 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -1,4 +1,4 @@ -use crate::catalog::{ColumnCatalog, ColumnRef}; +use crate::catalog::{ColumnCatalog, ColumnRef, TableName}; use crate::errors::DatabaseError; use crate::expression; use crate::expression::agg::AggKind; @@ -7,6 +7,7 @@ use sqlparser::ast::{ BinaryOperator, CharLengthUnits, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, Query, UnaryOperator, }; +use std::collections::HashMap; use std::slice; use std::sync::Arc; @@ -15,7 +16,7 @@ use crate::expression::function::scala::{ArcScalarFunctionImpl, ScalarFunction}; use crate::expression::function::table::{ArcTableFunctionImpl, TableFunction}; use crate::expression::function::FunctionSummary; use crate::expression::{AliasType, ScalarExpression}; -use crate::planner::LogicalPlan; +use crate::planner::{LogicalPlan, SchemaOutput}; use crate::storage::Transaction; use crate::types::value::{DataValue, Utf8Type}; use crate::types::{ColumnId, LogicalType}; @@ -39,7 +40,7 @@ macro_rules! try_default { }; } -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl<'a, T: Transaction> Binder<'a, '_, T> { pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result { match expr { Expr::Identifier(ident) => { @@ -249,6 +250,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { ) -> Result<(LogicalPlan, ColumnRef), DatabaseError> { let BinderContext { table_cache, + view_cache, transaction, scala_functions, table_functions, @@ -258,6 +260,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { let mut binder = Binder::new( BinderContext::new( table_cache, + view_cache, *transaction, scala_functions, table_functions, @@ -324,46 +327,53 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { try_default!(&full_name.0, full_name.1); } if let Some(table) = full_name.0.or(bind_table_name) { - let table_catalog = self.context.bind_table(&table, self.parent)?; - - let column_catalog = table_catalog - .get_column_by_name(&full_name.1) - .ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?; - Ok(ScalarExpression::ColumnRef(column_catalog.clone())) + let source = self.context.bind_source(&table, self.parent)?; + let schema_buf = self.table_schema_buf.entry(Arc::new(table)).or_default(); + + Ok(ScalarExpression::ColumnRef( + source + .column(&full_name.1, schema_buf) + .ok_or_else(|| DatabaseError::NotFound("column", full_name.1.to_string()))?, + )) } else { - let op = |got_column: &mut Option, context: &BinderContext<'a, T>| { - for ((_, alias, _), table_catalog) in context.bind_table.iter() { - if got_column.is_some() { - break; - } - if let Some(alias) = alias { - *got_column = self.context.expr_aliases.iter().find_map( - |((alias_table, alias_column), expr)| { - matches!( - alias_table - .as_ref() - .map(|table_name| table_name == alias.as_ref() - && alias_column == &full_name.1), - Some(true) - ) - .then(|| expr.clone()) - }, - ); - } else if let Some(column_catalog) = - table_catalog.get_column_by_name(&full_name.1) - { - *got_column = Some(ScalarExpression::ColumnRef(column_catalog.clone())); + let op = + |got_column: &mut Option, + context: &BinderContext<'a, T>, + table_schema_buf: &mut HashMap>| { + for ((table_name, alias, _), source) in context.bind_table.iter() { + if got_column.is_some() { + break; + } + if let Some(alias) = alias { + *got_column = self.context.expr_aliases.iter().find_map( + |((alias_table, alias_column), expr)| { + matches!( + alias_table + .as_ref() + .map(|table_name| table_name == alias.as_ref() + && alias_column == &full_name.1), + Some(true) + ) + .then(|| expr.clone()) + }, + ); + } else if let Some(column) = { + let schema_buf = + table_schema_buf.entry(table_name.clone()).or_default(); + source.column(&full_name.1, schema_buf) + } { + *got_column = Some(ScalarExpression::ColumnRef(column)); + } } - } - }; + }; // handle col syntax let mut got_column = None; - op(&mut got_column, &self.context); + op(&mut got_column, &self.context, &mut self.table_schema_buf); if let Some(parent) = self.parent { - op(&mut got_column, &parent.context); + op(&mut got_column, &parent.context, &mut self.table_schema_buf); } - Ok(got_column.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?) + Ok(got_column.ok_or(DatabaseError::NotFound("column", full_name.1))?) } } diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 6047cc09..87e8491f 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -12,7 +12,7 @@ use sqlparser::ast::{Expr, Ident, ObjectName}; use std::slice; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_insert( &mut self, name: &ObjectName, @@ -25,14 +25,16 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { self.context.allow_default = true; let table_name = Arc::new(lower_case_name(name)?); - let table = self + let source = self .context - .table_and_bind(table_name.clone(), None, None)?; + .source_and_bind(table_name.clone(), None, None, false)? + .ok_or(DatabaseError::TableNotFound)?; let mut _schema_ref = None; let values_len = expr_rows[0].len(); if idents.is_empty() { - let temp_schema_ref = table.schema_ref().clone(); + let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default(); + let temp_schema_ref = source.schema_ref(schema_buf); if values_len > temp_schema_ref.len() { return Err(DatabaseError::ValuesLenMismatch( temp_schema_ref.len(), diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 54ed7640..1c06401a 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -22,13 +22,15 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use crate::catalog::{TableCatalog, TableName}; +use crate::catalog::view::View; +use crate::catalog::{ColumnRef, TableCatalog, TableName}; use crate::db::{ScalaFunctions, TableFunctions}; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::join::JoinType; -use crate::planner::LogicalPlan; -use crate::storage::{TableCache, Transaction}; +use crate::planner::{LogicalPlan, SchemaOutput}; +use crate::storage::{TableCache, Transaction, ViewCache}; +use crate::types::tuple::SchemaRef; pub enum InputRefType { AggCall, @@ -45,6 +47,7 @@ pub fn command_type(stmt: &Statement) -> Result { match stmt { Statement::CreateTable { .. } | Statement::CreateIndex { .. } + | Statement::CreateView { .. } | Statement::AlterTable { .. } | Statement::Drop { .. } => Ok(CommandType::DDL), Statement::Query(_) @@ -81,15 +84,21 @@ pub enum SubQueryType { InSubQuery(bool, LogicalPlan), } +#[derive(Debug, Clone)] +pub enum Source<'a> { + Table(&'a TableCatalog), + View(&'a View), +} + #[derive(Clone)] pub struct BinderContext<'a, T: Transaction> { pub(crate) scala_functions: &'a ScalaFunctions, pub(crate) table_functions: &'a TableFunctions, pub(crate) table_cache: &'a TableCache, + pub(crate) view_cache: &'a ViewCache, pub(crate) transaction: &'a T, // Tips: When there are multiple tables and Wildcard, use BTreeMap to ensure that the order of the output tables is certain. - pub(crate) bind_table: - BTreeMap<(TableName, Option, Option), &'a TableCatalog>, + pub(crate) bind_table: BTreeMap<(TableName, Option, Option), Source<'a>>, // alias expr_aliases: BTreeMap<(Option, String), ScalarExpression>, table_aliases: HashMap, @@ -106,9 +115,53 @@ pub struct BinderContext<'a, T: Transaction> { pub(crate) allow_default: bool, } +impl Source<'_> { + pub(crate) fn column( + &self, + name: &str, + schema_buf: &mut Option, + ) -> Option { + match self { + Source::Table(table) => table.get_column_by_name(name), + Source::View(view) => schema_buf + .get_or_insert_with(|| view.plan.output_schema_direct()) + .columns() + .find(|column| column.name() == name), + } + .cloned() + } + + pub(crate) fn columns<'a>( + &'a self, + schema_buf: &'a mut Option, + ) -> Box + 'a> { + match self { + Source::Table(table) => Box::new(table.columns()), + Source::View(view) => Box::new( + schema_buf + .get_or_insert_with(|| view.plan.output_schema_direct()) + .columns(), + ), + } + } + + pub(crate) fn schema_ref(&self, schema_buf: &mut Option) -> SchemaRef { + match self { + Source::Table(table) => table.schema_ref().clone(), + Source::View(view) => { + match schema_buf.get_or_insert_with(|| view.plan.output_schema_direct()) { + SchemaOutput::Schema(schema) => Arc::new(schema.clone()), + SchemaOutput::SchemaRef(schema_ref) => schema_ref.clone(), + } + } + } + } +} + impl<'a, T: Transaction> BinderContext<'a, T> { pub fn new( table_cache: &'a TableCache, + view_cache: &'a ViewCache, transaction: &'a T, scala_functions: &'a ScalaFunctions, table_functions: &'a TableFunctions, @@ -118,6 +171,7 @@ impl<'a, T: Transaction> BinderContext<'a, T> { scala_functions, table_functions, table_cache, + view_cache, transaction, bind_table: Default::default(), expr_aliases: Default::default(), @@ -162,7 +216,7 @@ impl<'a, T: Transaction> BinderContext<'a, T> { self.sub_queries.remove(&self.bind_step) } - pub fn table(&self, table_name: TableName) -> Option<&TableCatalog> { + pub fn table(&self, table_name: TableName) -> Result, DatabaseError> { if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) { self.transaction.table(self.table_cache, real_name.clone()) } else { @@ -170,38 +224,76 @@ impl<'a, T: Transaction> BinderContext<'a, T> { } } - pub fn table_and_bind( + pub fn view(&self, view_name: TableName) -> Result, DatabaseError> { + if let Some(real_name) = self.table_aliases.get(view_name.as_ref()) { + self.transaction.view( + self.view_cache, + real_name.clone(), + (self.transaction, self.table_cache), + ) + } else { + self.transaction.view( + self.view_cache, + view_name.clone(), + (self.transaction, self.table_cache), + ) + } + } + + #[allow(unused_assignments)] + pub fn source_and_bind( &mut self, table_name: TableName, - alias: Option, + alias: Option<&TableName>, join_type: Option, - ) -> Result<&TableCatalog, DatabaseError> { - let table = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) { + only_table: bool, + ) -> Result, DatabaseError> { + let mut source = None; + + source = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) { self.transaction.table(self.table_cache, real_name.clone()) } else { self.transaction.table(self.table_cache, table_name.clone()) + }? + .map(Source::Table); + + if source.is_none() && !only_table { + source = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) { + self.transaction.view( + self.view_cache, + real_name.clone(), + (self.transaction, self.table_cache), + ) + } else { + self.transaction.view( + self.view_cache, + table_name.clone(), + (self.transaction, self.table_cache), + ) + }? + .map(Source::View); } - .ok_or(DatabaseError::TableNotFound)?; - - self.bind_table - .insert((table_name.clone(), alias, join_type), table); - - Ok(table) + if let Some(source) = &source { + self.bind_table.insert( + (table_name.clone(), alias.cloned(), join_type), + source.clone(), + ); + } + Ok(source) } - /// get table from bindings - pub fn bind_table<'b: 'a>( + pub fn bind_source<'b: 'a>( &self, table_name: &str, parent: Option<&'b Binder<'a, 'b, T>>, - ) -> Result<&TableCatalog, DatabaseError> { - if let Some(table_catalog) = self.bind_table.iter().find(|((t, alias, _), _)| { + ) -> Result<&Source, DatabaseError> { + if let Some(source) = self.bind_table.iter().find(|((t, alias, _), _)| { t.as_str() == table_name || matches!(alias.as_ref().map(|a| a.as_str() == table_name), Some(true)) }) { - Ok(table_catalog.1) + Ok(source.1) } else if let Some(binder) = parent { - binder.context.bind_table(table_name, binder.parent) + binder.context.bind_source(table_name, binder.parent) } else { Err(DatabaseError::InvalidTable(table_name.into())) } @@ -239,12 +331,17 @@ impl<'a, T: Transaction> BinderContext<'a, T> { pub struct Binder<'a, 'b, T: Transaction> { context: BinderContext<'a, T>, + table_schema_buf: HashMap>, pub(crate) parent: Option<&'b Binder<'a, 'b, T>>, } impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { pub fn new(context: BinderContext<'a, T>, parent: Option<&'b Binder<'a, 'b, T>>) -> Self { - Binder { context, parent } + Binder { + context, + table_schema_buf: Default::default(), + parent, + } } pub fn bind(&mut self, stmt: &Statement) -> Result { @@ -394,7 +491,7 @@ pub mod test { use crate::errors::DatabaseError; use crate::planner::LogicalPlan; use crate::storage::rocksdb::RocksStorage; - use crate::storage::{Storage, TableCache, Transaction}; + use crate::storage::{Storage, TableCache, Transaction, ViewCache}; use crate::types::ColumnId; use crate::types::LogicalType::Integer; use crate::utils::lru::ShardingLruCache; @@ -407,6 +504,7 @@ pub mod test { pub(crate) struct TableState { pub(crate) table: TableCatalog, pub(crate) table_cache: Arc, + pub(crate) view_cache: Arc, pub(crate) storage: S, } @@ -418,6 +516,7 @@ pub mod test { let mut binder = Binder::new( BinderContext::new( &self.table_cache, + &self.view_cache, &transaction, &scala_functions, &table_functions, @@ -438,11 +537,12 @@ pub mod test { pub(crate) fn build_t1_table() -> Result, DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let storage = build_test_catalog(&table_cache, temp_dir.path())?; let table = { let transaction = storage.transaction()?; transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap() .clone() }; @@ -450,6 +550,7 @@ pub mod test { Ok(TableState { table, table_cache, + view_cache, storage, }) } diff --git a/src/binder/select.rs b/src/binder/select.rs index 646e572f..cecfa660 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -14,7 +14,9 @@ use crate::{ types::value::DataValue, }; -use super::{lower_case_name, lower_ident, Binder, BinderContext, QueryBindStep, SubQueryType}; +use super::{ + lower_case_name, lower_ident, Binder, BinderContext, QueryBindStep, Source, SubQueryType, +}; use crate::catalog::{ColumnCatalog, ColumnRef, ColumnSummary, TableName}; use crate::errors::DatabaseError; @@ -25,7 +27,7 @@ use crate::planner::operator::insert::InsertOperator; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::operator::union::UnionOperator; -use crate::planner::LogicalPlan; +use crate::planner::{LogicalPlan, SchemaOutput}; use crate::storage::Transaction; use crate::types::tuple::{Schema, SchemaRef}; use crate::types::{ColumnId, LogicalType}; @@ -312,7 +314,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { self.context .bind_table - .insert((table_name, table_alias, joint_type), table); + .insert((table_name, table_alias, joint_type), Source::Table(table)); plan } else { unreachable!() @@ -356,7 +358,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { alias_column.set_ref_table( table_alias.clone(), column.id().unwrap_or(ColumnId::new()), - true, + false, ); let alias_column_expr = ScalarExpression::Alias { @@ -391,16 +393,19 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { alias_idents = Some(columns); } - let table_catalog = - self.context - .table_and_bind(table_name.clone(), table_alias.clone(), join_type)?; - let mut scan_op = TableScanOperator::build(table_name.clone(), table_catalog); + let source = self + .context + .source_and_bind(table_name.clone(), table_alias.as_ref(), join_type, false)? + .ok_or(DatabaseError::SourceNotFound)?; + let mut plan = match source { + Source::Table(table) => TableScanOperator::build(table_name.clone(), table), + Source::View(view) => LogicalPlan::clone(&view.plan), + }; if let Some(idents) = alias_idents { - scan_op = self.bind_alias(scan_op, idents, table_alias.unwrap(), table_name.clone())?; + plan = self.bind_alias(plan, idents, table_alias.unwrap(), table_name.clone())?; } - - Ok(scan_op) + Ok(plan) } /// Normalize select item. @@ -441,7 +446,11 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { let mut join_used = HashSet::with_capacity(self.context.using.len()); for (table_name, alias, _) in self.context.bind_table.keys() { - self.bind_table_column_refs( + let schema_buf = + self.table_schema_buf.entry(table_name.clone()).or_default(); + Self::bind_table_column_refs( + &self.context, + schema_buf, &mut select_items, alias.as_ref().unwrap_or(table_name).clone(), Some(&mut join_used), @@ -449,9 +458,14 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { } } SelectItem::QualifiedWildcard(table_name, _) => { - self.bind_table_column_refs( + let table_name = Arc::new(lower_case_name(table_name)?); + let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default(); + + Self::bind_table_column_refs( + &self.context, + schema_buf, &mut select_items, - Arc::new(lower_case_name(table_name)?), + table_name, None, )?; } @@ -461,8 +475,10 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { Ok(select_items) } + #[allow(unused_assignments)] fn bind_table_column_refs( - &self, + context: &BinderContext<'a, T>, + schema_buf: &mut Option, exprs: &mut Vec, table_name: TableName, mut join_used: Option<&mut HashSet>, @@ -474,12 +490,12 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { context.using.contains(column_name) && matches!(join_used.map(|used| used.contains(column_name)), Some(true)) }; - for (_, alias_expr) in self.context.expr_aliases.iter().filter(|(_, expr)| { + for (_, alias_expr) in context.expr_aliases.iter().filter(|(_, expr)| { if let ScalarExpression::ColumnRef(col) = expr.unpack_alias_ref() { let column_name = col.name(); if Some(&table_name) == col.table_name() - && !fn_used(column_name, &self.context, join_used.as_deref()) + && !fn_used(column_name, context, join_used.as_deref()) { if let Some(used) = join_used.as_mut() { used.insert(column_name.to_string()); @@ -496,14 +512,19 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { return Ok(()); } - let table = self - .context - .table(table_name.clone()) - .ok_or(DatabaseError::TableNotFound)?; - for column in table.columns() { + let mut source = None; + + source = context.table(table_name.clone())?.map(Source::Table); + if source.is_none() { + source = context.view(table_name)?.map(Source::View); + } + for column in source + .ok_or(DatabaseError::SourceNotFound)? + .columns(schema_buf) + { let column_name = column.name(); - if fn_used(column_name, &self.context, join_used.as_deref()) { + if fn_used(column_name, context, join_used.as_deref()) { continue; } let expr = ScalarExpression::ColumnRef(column.clone()); @@ -536,6 +557,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { }; let BinderContext { table_cache, + view_cache, transaction, scala_functions, table_functions, @@ -545,6 +567,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { let mut binder = Binder::new( BinderContext::new( table_cache, + view_cache, *transaction, scala_functions, table_functions, @@ -718,26 +741,33 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { let mut left_table_force_nullable = false; let mut left_table = None; - for ((_, _, join_option), table) in bind_tables { + for ((table_name, _, join_option), table) in bind_tables { if let Some(join_type) = join_option { let (left_force_nullable, right_force_nullable) = joins_nullable(join_type); - table_force_nullable.push((table, right_force_nullable)); + table_force_nullable.push((table_name, table, right_force_nullable)); left_table_force_nullable = left_force_nullable; } else { - left_table = Some(table); + left_table = Some((table_name, table)); } } - if let Some(table) = left_table { - table_force_nullable.push((table, left_table_force_nullable)); + if let Some((table_name, table)) = left_table { + table_force_nullable.push((table_name, table, left_table_force_nullable)); } for column in select_items { if let ScalarExpression::ColumnRef(col) = column { let _ = table_force_nullable .iter() - .find(|(table, _)| table.contains_column(col.name())) - .map(|(_, nullable)| { + .find(|(table_name, source, _)| { + let schema_buf = self + .table_schema_buf + .entry((*table_name).clone()) + .or_default(); + + source.column(col.name(), schema_buf).is_some() + }) + .map(|(_, _, nullable)| { if let Some(new_column) = col.nullable_for_join(*nullable) { *col = new_column; } diff --git a/src/binder/show.rs b/src/binder/show.rs index b3b54b40..9d9b0536 100644 --- a/src/binder/show.rs +++ b/src/binder/show.rs @@ -4,7 +4,7 @@ use crate::planner::operator::Operator; use crate::planner::LogicalPlan; use crate::storage::Transaction; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_show_tables(&mut self) -> Result { Ok(LogicalPlan::new(Operator::Show, vec![])) } diff --git a/src/binder/truncate.rs b/src/binder/truncate.rs index 2dbd9e86..a1a0ee98 100644 --- a/src/binder/truncate.rs +++ b/src/binder/truncate.rs @@ -7,7 +7,7 @@ use crate::storage::Transaction; use sqlparser::ast::ObjectName; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_truncate( &mut self, name: &ObjectName, diff --git a/src/binder/update.rs b/src/binder/update.rs index fe803a3f..9ab28994 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -10,7 +10,7 @@ use sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}; use std::slice; use std::sync::Arc; -impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { +impl Binder<'_, '_, T> { pub(crate) fn bind_update( &mut self, to: &TableWithJoins, diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 569ba97b..23c2601d 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -52,6 +52,7 @@ impl TableCatalog { .map(|(_, i)| &self.schema_ref[*i]) } + #[allow(dead_code)] pub(crate) fn contains_column(&self, name: &str) -> bool { self.column_idxs.contains_key(name) } diff --git a/src/db.rs b/src/db.rs index f80e445f..48a64eac 100644 --- a/src/db.rs +++ b/src/db.rs @@ -13,7 +13,7 @@ use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::parser::parse_sql; use crate::planner::LogicalPlan; use crate::storage::rocksdb::RocksStorage; -use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction, ViewCache}; use crate::types::tuple::{SchemaRef, Tuple}; use crate::utils::lru::ShardingLruCache; use ahash::HashMap; @@ -70,6 +70,7 @@ impl DataBaseBuilder { let storage = RocksStorage::new(self.path)?; let meta_cache = Arc::new(ShardingLruCache::new(256, 8, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(48, 4, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(12, 4, RandomState::new())?); Ok(Database { storage, @@ -78,6 +79,7 @@ impl DataBaseBuilder { mdl: Arc::new(RwLock::new(())), meta_cache, table_cache, + view_cache, }) } } @@ -89,6 +91,7 @@ pub struct Database { mdl: Arc>, pub(crate) meta_cache: Arc, pub(crate) table_cache: Arc, + pub(crate) view_cache: Arc, } impl Database { @@ -109,6 +112,7 @@ impl Database { let mut plan = Self::build_plan( stmt, &self.table_cache, + &self.view_cache, &self.meta_cache, &transaction, &self.scala_functions, @@ -118,7 +122,7 @@ impl Database { let schema = plan.output_schema().clone(); let iterator = build_write( plan, - (&self.table_cache, &self.meta_cache), + (&self.table_cache, &self.view_cache, &self.meta_cache), &mut transaction, ); let tuples = try_collect(iterator)?; @@ -139,12 +143,14 @@ impl Database { _guard: guard, meta_cache: self.meta_cache.clone(), table_cache: self.table_cache.clone(), + view_cache: self.view_cache.clone(), }) } pub(crate) fn build_plan( stmt: &Statement, table_cache: &TableCache, + view_cache: &ViewCache, meta_cache: &StatisticsMetaCache, transaction: &::TransactionType<'_>, scala_functions: &ScalaFunctions, @@ -153,6 +159,7 @@ impl Database { let mut binder = Binder::new( BinderContext::new( table_cache, + view_cache, transaction, scala_functions, table_functions, @@ -265,6 +272,7 @@ pub struct DBTransaction<'a, S: Storage + 'a> { _guard: ArcRwLockReadGuard, pub(crate) meta_cache: Arc, pub(crate) table_cache: Arc, + pub(crate) view_cache: Arc, } impl DBTransaction<'_, S> { @@ -282,6 +290,7 @@ impl DBTransaction<'_, S> { let mut plan = Database::::build_plan( stmt, &self.table_cache, + &self.view_cache, &self.meta_cache, &self.inner, &self.scala_functions, @@ -289,7 +298,11 @@ impl DBTransaction<'_, S> { )?; let schema = plan.output_schema().clone(); - let executor = build_write(plan, (&self.table_cache, &self.meta_cache), &mut self.inner); + let executor = build_write( + plan, + (&self.table_cache, &self.view_cache, &self.meta_cache), + &mut self.inner, + ); Ok((schema, try_collect(executor)?)) } @@ -399,7 +412,7 @@ pub(crate) mod test { ColumnDesc::new(LogicalType::Integer, false, false, None).unwrap(), ); let number_column_id = schema[0].id().unwrap(); - column.set_ref_table(Arc::new("a".to_string()), number_column_id, true); + column.set_ref_table(Arc::new("a".to_string()), number_column_id, false); debug_assert_eq!(schema, Arc::new(vec![ColumnRef::from(column)])); debug_assert_eq!( diff --git a/src/errors.rs b/src/errors.rs index f5320b39..e903fd45 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -120,6 +120,8 @@ pub enum DatabaseError { ), #[error("the number of caches cannot be divisible by the number of shards")] ShardingNotAlign, + #[error("the view not found")] + SourceNotFound, #[error("the table already exists")] TableExists, #[error("the table not found")] @@ -152,6 +154,4 @@ pub enum DatabaseError { ValuesLenMismatch(usize, usize), #[error("the view already exists")] ViewExists, - #[error("the view not found")] - ViewNotFound, } diff --git a/src/execution/ddl/add_column.rs b/src/execution/ddl/add_column.rs index 863d2efc..09bd0398 100644 --- a/src/execution/ddl/add_column.rs +++ b/src/execution/ddl/add_column.rs @@ -1,6 +1,6 @@ use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache}; +use crate::storage::{StatisticsMetaCache, TableCache, ViewCache}; use crate::types::index::{Index, IndexType}; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; @@ -28,7 +28,7 @@ impl From<(AddColumnOperator, LogicalPlan)> for AddColumn { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { fn execute_mut( mut self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -76,8 +76,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { // Unique Index if let (Some(unique_values), Some(unique_meta)) = ( unique_values, - transaction - .table(cache.0, table_name.clone()) + throw!(transaction.table(cache.0, table_name.clone())) .and_then(|table| table.get_unique_index(&col_id)) .cloned(), ) { diff --git a/src/execution/ddl/create_index.rs b/src/execution/ddl/create_index.rs index 94c6dd13..b73394c2 100644 --- a/src/execution/ddl/create_index.rs +++ b/src/execution/ddl/create_index.rs @@ -4,7 +4,7 @@ use crate::execution::{build_read, Executor, WriteExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::create_index::CreateIndexOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::index::Index; use crate::types::tuple::Tuple; @@ -28,7 +28,7 @@ impl From<(CreateIndexOperator, LogicalPlan)> for CreateIndex { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { fn execute_mut( mut self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/ddl/create_table.rs b/src/execution/ddl/create_table.rs index 1ad9555b..c2d2a7f4 100644 --- a/src/execution/ddl/create_table.rs +++ b/src/execution/ddl/create_table.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::create_table::CreateTableOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; @@ -17,7 +17,7 @@ impl From for CreateTable { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable { fn execute_mut( self, - (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/ddl/create_view.rs b/src/execution/ddl/create_view.rs index 072756c1..5fe81863 100644 --- a/src/execution/ddl/create_view.rs +++ b/src/execution/ddl/create_view.rs @@ -3,23 +3,21 @@ use crate::planner::operator::create_view::CreateViewOperator; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; -use std::sync::Arc; pub struct CreateView { op: CreateViewOperator, - view_cache: Arc, } -impl From<(CreateViewOperator, Arc)> for CreateView { - fn from((op, view_cache): (CreateViewOperator, Arc)) -> Self { - CreateView { op, view_cache } +impl From for CreateView { + fn from(op: CreateViewOperator) -> Self { + CreateView { op } } } impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateView { fn execute_mut( self, - _: (&'a TableCache, &'a StatisticsMetaCache), + (_, view_cache, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -28,7 +26,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateView { let CreateViewOperator { view, or_replace } = self.op; let result_tuple = TupleBuilder::build_result(format!("{}", view.name)); - throw!(transaction.create_view(&self.view_cache, view, or_replace)); + throw!(transaction.create_view(view_cache, view, or_replace)); yield Ok(result_tuple); }, diff --git a/src/execution/ddl/drop_column.rs b/src/execution/ddl/drop_column.rs index 30fd5e92..5aee0f16 100644 --- a/src/execution/ddl/drop_column.rs +++ b/src/execution/ddl/drop_column.rs @@ -2,7 +2,7 @@ use crate::errors::DatabaseError; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::operator::alter_table::drop_column::DropColumnOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::tuple_builder::TupleBuilder; @@ -24,7 +24,7 @@ impl From<(DropColumnOperator, LogicalPlan)> for DropColumn { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { fn execute_mut( mut self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -69,7 +69,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { for tuple in tuples { throw!(transaction.append_tuple(&table_name, tuple, &types, true)); } - throw!(transaction.drop_column(cache.0, cache.1, &table_name, &column_name)); + throw!(transaction.drop_column(cache.0, cache.2, &table_name, &column_name)); yield Ok(TupleBuilder::build_result("1".to_string())); } else if if_exists { diff --git a/src/execution/ddl/drop_table.rs b/src/execution/ddl/drop_table.rs index 3e949bf2..e80b862f 100644 --- a/src/execution/ddl/drop_table.rs +++ b/src/execution/ddl/drop_table.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::drop_table::DropTableOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; @@ -17,7 +17,7 @@ impl From for DropTable { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable { fn execute_mut( self, - (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/ddl/truncate.rs b/src/execution/ddl/truncate.rs index 0d57ac1e..d1e00f84 100644 --- a/src/execution/ddl/truncate.rs +++ b/src/execution/ddl/truncate.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::truncate::TruncateOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple_builder::TupleBuilder; @@ -17,7 +17,7 @@ impl From for Truncate { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Truncate { fn execute_mut( self, - _: (&'a TableCache, &'a StatisticsMetaCache), + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 308992c0..32e4b625 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -6,7 +6,7 @@ use crate::optimizer::core::histogram::HistogramBuilder; use crate::optimizer::core::statistics_meta::StatisticsMeta; use crate::planner::operator::analyze::AnalyzeOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::index::IndexMetaRef; use crate::types::tuple::Tuple; @@ -53,7 +53,7 @@ impl From<(AnalyzeOperator, LogicalPlan)> for Analyze { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { fn execute_mut( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -67,8 +67,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { let schema = input.output_schema().clone(); let mut builders = Vec::with_capacity(index_metas.len()); - let table = throw!(transaction - .table(cache.0, table_name.clone()) + let table = throw!(throw!(transaction.table(cache.0, table_name.clone())) .cloned() .ok_or(DatabaseError::TableNotFound)); @@ -122,7 +121,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, })); - throw!(transaction.save_table_meta(cache.1, &table_name, path_str, meta)); + throw!(transaction.save_table_meta(cache.2, &table_name, path_str, meta)); throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO)); active_index_paths.insert(index_file); @@ -133,7 +132,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { let entry: DirEntry = throw!(entry.map_err(DatabaseError::IO)); if !active_index_paths.remove(&entry.file_name()) { - throw!(fs::remove_file(&entry.path()).map_err(DatabaseError::IO)); + throw!(fs::remove_file(entry.path()).map_err(DatabaseError::IO)); } } diff --git a/src/execution/dml/copy_from_file.rs b/src/execution/dml/copy_from_file.rs index db7b04d6..8e12ea1a 100644 --- a/src/execution/dml/copy_from_file.rs +++ b/src/execution/dml/copy_from_file.rs @@ -2,7 +2,7 @@ use crate::binder::copy::FileFormat; use crate::errors::DatabaseError; use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::copy_from_file::CopyFromFileOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{types, Tuple}; use crate::types::tuple_builder::TupleBuilder; @@ -26,7 +26,7 @@ impl From for CopyFromFile { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile { fn execute_mut( self, - _: (&'a TableCache, &'a StatisticsMetaCache), + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -201,8 +201,10 @@ mod tests { let storage = db.storage; let mut transaction = storage.transaction()?; - let mut coroutine = - executor.execute_mut((&db.table_cache, &db.meta_cache), &mut transaction); + let mut coroutine = executor.execute_mut( + (&db.table_cache, &db.view_cache, &db.meta_cache), + &mut transaction, + ); let tuple = match Pin::new(&mut coroutine).resume(()) { CoroutineState::Yielded(tuple) => tuple, CoroutineState::Complete(()) => unreachable!(), diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index 9eaa9c62..6ab02785 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -5,7 +5,7 @@ use crate::execution::{build_read, Executor, WriteExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::delete::DeleteOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::index::{Index, IndexId, IndexType}; use crate::types::tuple::Tuple; @@ -30,7 +30,7 @@ impl From<(DeleteOperator, LogicalPlan)> for Delete { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { fn execute_mut( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -42,8 +42,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { } = self; let schema = input.output_schema().clone(); - let table = throw!(transaction - .table(cache.0, table_name.clone()) + let table = throw!(throw!(transaction.table(cache.0, table_name.clone())) .cloned() .ok_or(DatabaseError::TableNotFound)); let mut tuple_ids = Vec::new(); diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 22e9ca3b..fe81b67b 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -4,7 +4,7 @@ use crate::execution::dql::projection::Projection; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::operator::insert::InsertOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::index::Index; use crate::types::tuple::Tuple; @@ -63,7 +63,7 @@ impl ColumnCatalog { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { fn execute_mut( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -83,9 +83,10 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { .iter() .find(|col| col.desc().is_primary) .map(|col| col.key(is_mapping_by_name)) - .ok_or_else(|| DatabaseError::NotNull)); + .ok_or(DatabaseError::NotNull)); - if let Some(table_catalog) = transaction.table(cache.0, table_name.clone()).cloned() + if let Some(table_catalog) = + throw!(transaction.table(cache.0, table_name.clone())).cloned() { let types = table_catalog.types(); let mut coroutine = build_read(input, cache, transaction); diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index 68c787b4..93d2f966 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -3,7 +3,7 @@ use crate::execution::dql::projection::Projection; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::operator::update::UpdateOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::index::Index; use crate::types::tuple::types; @@ -35,7 +35,7 @@ impl From<(UpdateOperator, LogicalPlan, LogicalPlan)> for Update { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { fn execute_mut( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { Box::new( @@ -51,7 +51,8 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { let input_schema = input.output_schema().clone(); let types = types(&input_schema); - if let Some(table_catalog) = transaction.table(cache.0, table_name.clone()).cloned() + if let Some(table_catalog) = + throw!(transaction.table(cache.0, table_name.clone())).cloned() { let mut value_map = HashMap::new(); let mut tuples = Vec::new(); diff --git a/src/execution/dql/aggregate/hash_agg.rs b/src/execution/dql/aggregate/hash_agg.rs index dd1d3b3f..484bd08d 100644 --- a/src/execution/dql/aggregate/hash_agg.rs +++ b/src/execution/dql/aggregate/hash_agg.rs @@ -5,7 +5,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{SchemaRef, Tuple}; use crate::types::value::ValueRef; @@ -42,7 +42,7 @@ impl From<(AggregateOperator, LogicalPlan)> for HashAggExecutor { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( @@ -184,6 +184,7 @@ mod test { #[test] fn test_hash_agg() -> Result<(), DatabaseError> { let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let temp_dir = TempDir::new().expect("unable to create temporary working directory"); @@ -241,7 +242,7 @@ mod test { let tuples = try_collect( HashAggExecutor::from((operator, input)) - .execute((&table_cache, &meta_cache), &transaction), + .execute((&table_cache, &view_cache, &meta_cache), &transaction), )?; println!( diff --git a/src/execution/dql/aggregate/simple_agg.rs b/src/execution/dql/aggregate/simple_agg.rs index e3d159a0..368cbc54 100644 --- a/src/execution/dql/aggregate/simple_agg.rs +++ b/src/execution/dql/aggregate/simple_agg.rs @@ -3,7 +3,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; @@ -28,7 +28,7 @@ impl From<(AggregateOperator, LogicalPlan)> for SimpleAggExecutor { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SimpleAggExecutor { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/describe.rs b/src/execution/dql/describe.rs index 117195b9..1b8a0a17 100644 --- a/src/execution/dql/describe.rs +++ b/src/execution/dql/describe.rs @@ -2,7 +2,7 @@ use crate::catalog::{ColumnCatalog, TableName}; use crate::execution::DatabaseError; use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::describe::DescribeOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type, ValueRef}; @@ -43,14 +43,13 @@ impl From for Describe { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Describe { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( #[coroutine] move || { - let table = throw!(transaction - .table(cache.0, self.table_name.clone()) + let table = throw!(throw!(transaction.table(cache.0, self.table_name.clone())) .ok_or(DatabaseError::TableNotFound)); let key_fn = |column: &ColumnCatalog| { if column.desc().is_primary { diff --git a/src/execution/dql/dummy.rs b/src/execution/dql/dummy.rs index 57e267c0..3c4acd40 100644 --- a/src/execution/dql/dummy.rs +++ b/src/execution/dql/dummy.rs @@ -1,11 +1,15 @@ use crate::execution::{Executor, ReadExecutor}; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::types::tuple::Tuple; pub struct Dummy {} impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Dummy { - fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { + fn execute( + self, + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + _: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dql/explain.rs b/src/execution/dql/explain.rs index f1badc7b..8c5fbdf0 100644 --- a/src/execution/dql/explain.rs +++ b/src/execution/dql/explain.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type}; use sqlparser::ast::CharLengthUnits; @@ -17,7 +17,11 @@ impl From for Explain { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Explain { - fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { + fn execute( + self, + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + _: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dql/filter.rs b/src/execution/dql/filter.rs index f72098c3..a4132142 100644 --- a/src/execution/dql/filter.rs +++ b/src/execution/dql/filter.rs @@ -2,7 +2,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::filter::FilterOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use std::ops::Coroutine; use std::ops::CoroutineState; @@ -22,7 +22,7 @@ impl From<(FilterOperator, LogicalPlan)> for Filter { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/function_scan.rs b/src/execution/dql/function_scan.rs index a9cfa5bc..410312e3 100644 --- a/src/execution/dql/function_scan.rs +++ b/src/execution/dql/function_scan.rs @@ -1,7 +1,7 @@ use crate::execution::{Executor, ReadExecutor}; use crate::expression::function::table::TableFunction; use crate::planner::operator::function_scan::FunctionScanOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; pub struct FunctionScan { @@ -17,7 +17,11 @@ impl From for FunctionScan { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for FunctionScan { - fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { + fn execute( + self, + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + _: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/dql/index_scan.rs b/src/execution/dql/index_scan.rs index 7ace9b4d..e1027280 100644 --- a/src/execution/dql/index_scan.rs +++ b/src/execution/dql/index_scan.rs @@ -1,7 +1,7 @@ use crate::execution::{Executor, ReadExecutor}; use crate::expression::range_detacher::Range; use crate::planner::operator::table_scan::TableScanOperator; -use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::index::IndexMetaRef; @@ -29,7 +29,7 @@ impl From<(TableScanOperator, IndexMetaRef, Range)> for IndexScan { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { fn execute( self, - (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index 460a832e..b5746e74 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -5,7 +5,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{Schema, SchemaRef, Tuple}; use crate::types::value::{DataValue, ValueRef, NULL_VALUE}; @@ -44,7 +44,7 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for HashJoin { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( @@ -528,6 +528,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); @@ -538,8 +539,8 @@ mod test { }, join_type: JoinType::Inner, }; - let executor = - HashJoin::from((op, left, right)).execute((&table_cache, &meta_cache), &transaction); + let executor = HashJoin::from((op, left, right)) + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; debug_assert_eq!(tuples.len(), 3); @@ -566,6 +567,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); @@ -579,7 +581,9 @@ mod test { //Outer { let executor = HashJoin::from((op.clone(), left.clone(), right.clone())); - let tuples = try_collect(executor.execute((&table_cache, &meta_cache), &transaction))?; + let tuples = try_collect( + executor.execute((&table_cache, &view_cache, &meta_cache), &transaction), + )?; debug_assert_eq!(tuples.len(), 4); @@ -604,8 +608,9 @@ mod test { { let mut executor = HashJoin::from((op.clone(), left.clone(), right.clone())); executor.ty = JoinType::LeftSemi; - let mut tuples = - try_collect(executor.execute((&table_cache, &meta_cache), &transaction))?; + let mut tuples = try_collect( + executor.execute((&table_cache, &view_cache, &meta_cache), &transaction), + )?; debug_assert_eq!(tuples.len(), 2); tuples.sort_by_key(|tuple| { @@ -627,7 +632,9 @@ mod test { { let mut executor = HashJoin::from((op, left, right)); executor.ty = JoinType::LeftAnti; - let tuples = try_collect(executor.execute((&table_cache, &meta_cache), &transaction))?; + let tuples = try_collect( + executor.execute((&table_cache, &view_cache, &meta_cache), &transaction), + )?; debug_assert_eq!(tuples.len(), 1); debug_assert_eq!( @@ -645,6 +652,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); @@ -655,8 +663,8 @@ mod test { }, join_type: JoinType::RightOuter, }; - let executor = - HashJoin::from((op, left, right)).execute((&table_cache, &meta_cache), &transaction); + let executor = HashJoin::from((op, left, right)) + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; debug_assert_eq!(tuples.len(), 4); @@ -687,6 +695,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right) = build_join_values(); @@ -697,8 +706,8 @@ mod test { }, join_type: JoinType::Full, }; - let executor = - HashJoin::from((op, left, right)).execute((&table_cache, &meta_cache), &transaction); + let executor = HashJoin::from((op, left, right)) + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; debug_assert_eq!(tuples.len(), 5); diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index 0c07b407..faaa7176 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -9,7 +9,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{Schema, SchemaRef, Tuple}; use crate::types::value::{DataValue, NULL_VALUE}; @@ -128,7 +128,7 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for NestedLoopJoin { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( @@ -168,9 +168,13 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { throw!(eq_cond.equals(&left_tuple, &right_tuple)), ) { (None, true) if matches!(ty, JoinType::RightOuter) => { + has_matched = true; Self::emit_tuple(&right_tuple, &left_tuple, ty, true) } - (None, true) => Self::emit_tuple(&left_tuple, &right_tuple, ty, true), + (None, true) => { + has_matched = true; + Self::emit_tuple(&left_tuple, &right_tuple, ty, true) + } (Some(filter), true) => { let new_tuple = Self::merge_tuple(&left_tuple, &right_tuple, &ty); let value = throw!(filter.eval(&new_tuple, &output_schema_ref)); @@ -537,6 +541,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -547,7 +552,7 @@ mod test { join_type: JoinType::Inner, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -565,6 +570,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -575,7 +581,7 @@ mod test { join_type: JoinType::LeftOuter, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; debug_assert_eq!( @@ -605,6 +611,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -615,7 +622,7 @@ mod test { join_type: JoinType::Cross, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -634,6 +641,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, _) = build_join_values(true); let op = JoinOperator { @@ -644,7 +652,7 @@ mod test { join_type: JoinType::Cross, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(3); @@ -666,6 +674,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, _) = build_join_values(false); let op = JoinOperator { @@ -676,7 +685,7 @@ mod test { join_type: JoinType::Cross, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; debug_assert_eq!(tuples.len(), 16); @@ -690,6 +699,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -700,7 +710,7 @@ mod test { join_type: JoinType::LeftSemi, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -717,6 +727,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -727,7 +738,7 @@ mod test { join_type: JoinType::LeftAnti, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(3); @@ -746,6 +757,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -756,7 +768,7 @@ mod test { join_type: JoinType::RightOuter, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(4); @@ -780,6 +792,7 @@ mod test { let storage = RocksStorage::new(temp_dir.path())?; let transaction = storage.transaction()?; let meta_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); + let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let (keys, left, right, filter) = build_join_values(true); let op = JoinOperator { @@ -790,7 +803,7 @@ mod test { join_type: JoinType::Full, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &transaction); let tuples = try_collect(executor)?; debug_assert_eq!( diff --git a/src/execution/dql/limit.rs b/src/execution/dql/limit.rs index 46a8d69a..cbb0eb1c 100644 --- a/src/execution/dql/limit.rs +++ b/src/execution/dql/limit.rs @@ -1,7 +1,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::operator::limit::LimitOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; @@ -25,7 +25,7 @@ impl From<(LimitOperator, LogicalPlan)> for Limit { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Limit { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/projection.rs b/src/execution/dql/projection.rs index e0ac6e85..7ec0e056 100644 --- a/src/execution/dql/projection.rs +++ b/src/execution/dql/projection.rs @@ -4,7 +4,7 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::project::ProjectOperator; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::ValueRef; @@ -26,7 +26,7 @@ impl From<(ProjectOperator, LogicalPlan)> for Projection { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Projection { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/seq_scan.rs b/src/execution/dql/seq_scan.rs index 2eb309a5..2a304dc6 100644 --- a/src/execution/dql/seq_scan.rs +++ b/src/execution/dql/seq_scan.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::table_scan::TableScanOperator; -use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; pub(crate) struct SeqScan { @@ -16,7 +16,7 @@ impl From for SeqScan { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { fn execute( self, - (table_cache, _): (&'a TableCache, &'a StatisticsMetaCache), + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/show_table.rs b/src/execution/dql/show_table.rs index d03af33d..9b773c7f 100644 --- a/src/execution/dql/show_table.rs +++ b/src/execution/dql/show_table.rs @@ -1,6 +1,6 @@ use crate::catalog::TableMeta; use crate::execution::{Executor, ReadExecutor}; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type}; @@ -12,7 +12,7 @@ pub struct ShowTables; impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for ShowTables { fn execute( self, - _: (&'a TableCache, &'a StatisticsMetaCache), + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index 51635f85..fdd1212b 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -2,7 +2,7 @@ use crate::errors::DatabaseError; use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{Schema, Tuple}; use itertools::Itertools; @@ -227,7 +227,7 @@ impl From<(SortOperator, LogicalPlan)> for Sort { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Sort { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/union.rs b/src/execution/dql/union.rs index b34e2e79..9cb3409e 100644 --- a/src/execution/dql/union.rs +++ b/src/execution/dql/union.rs @@ -1,6 +1,6 @@ use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; @@ -22,7 +22,7 @@ impl From<(LogicalPlan, LogicalPlan)> for Union { impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Union { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { Box::new( diff --git a/src/execution/dql/values.rs b/src/execution/dql/values.rs index 22f58eef..84a73dd8 100644 --- a/src/execution/dql/values.rs +++ b/src/execution/dql/values.rs @@ -1,6 +1,6 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::values::ValuesOperator; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::types::tuple::Tuple; pub struct Values { @@ -14,7 +14,11 @@ impl From for Values { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Values { - fn execute(self, _: (&'a TableCache, &'a StatisticsMetaCache), _: &T) -> Executor<'a> { + fn execute( + self, + _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + _: &'a T, + ) -> Executor<'a> { Box::new( #[coroutine] move || { diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 7cd5c75c..bf783963 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -8,6 +8,7 @@ use self::dql::join::nested_loop_join::NestedLoopJoin; use crate::errors::DatabaseError; use crate::execution::ddl::create_index::CreateIndex; use crate::execution::ddl::create_table::CreateTable; +use crate::execution::ddl::create_view::CreateView; use crate::execution::ddl::drop_column::DropColumn; use crate::execution::ddl::drop_table::DropTable; use crate::execution::ddl::truncate::Truncate; @@ -35,7 +36,7 @@ use crate::execution::dql::values::Values; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::{Operator, PhysicalOption}; use crate::planner::LogicalPlan; -use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::types::index::IndexInfo; use crate::types::tuple::Tuple; use std::ops::{Coroutine, CoroutineState}; @@ -47,7 +48,7 @@ pub type Executor<'a> = pub trait ReadExecutor<'a, T: Transaction + 'a> { fn execute( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a>; } @@ -55,14 +56,14 @@ pub trait ReadExecutor<'a, T: Transaction + 'a> { pub trait WriteExecutor<'a, T: Transaction + 'a> { fn execute_mut( self, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a>; } pub fn build_read<'a, T: Transaction + 'a>( plan: LogicalPlan, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a T, ) -> Executor<'a> { let LogicalPlan { @@ -149,7 +150,7 @@ pub fn build_read<'a, T: Transaction + 'a>( pub fn build_write<'a, T: Transaction + 'a>( plan: LogicalPlan, - cache: (&'a TableCache, &'a StatisticsMetaCache), + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: &'a mut T, ) -> Executor<'a> { let LogicalPlan { @@ -190,6 +191,7 @@ pub fn build_write<'a, T: Transaction + 'a>( CreateIndex::from((op, input)).execute_mut(cache, transaction) } + Operator::CreateView(op) => CreateView::from(op).execute_mut(cache, transaction), Operator::DropTable(op) => DropTable::from(op).execute_mut(cache, transaction), Operator::Truncate(op) => Truncate::from(op).execute_mut(cache, transaction), Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(cache, transaction), diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index 8fabd971..d09d69cb 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -267,13 +267,11 @@ impl ScalarExpression { unit: CharLengthUnits::Characters, })) } - ScalarExpression::Reference { pos, .. } => { - return Ok(tuple - .values - .get(*pos) - .unwrap_or_else(|| &NULL_VALUE) - .clone()); - } + ScalarExpression::Reference { pos, .. } => Ok(tuple + .values + .get(*pos) + .unwrap_or_else(|| &NULL_VALUE) + .clone()), ScalarExpression::Tuple(exprs) => { let mut values = Vec::with_capacity(exprs.len()); diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 3d8ea01d..252a38b7 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -1347,7 +1347,7 @@ mod test { let mut reference_tables = ReferenceTables::new(); let c3_column_id = { let table = transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap(); *table.get_column_id_by_name("c3").unwrap() }; diff --git a/src/function/numbers.rs b/src/function/numbers.rs index 7680c739..24ef5237 100644 --- a/src/function/numbers.rs +++ b/src/function/numbers.rs @@ -64,7 +64,7 @@ impl TableFunctionImpl for Numbers { if value.logical_type() != LogicalType::Integer { value = Arc::new(DataValue::clone(&value).cast(&LogicalType::Integer)?); } - let num = value.i32().ok_or_else(|| DatabaseError::NotNull)?; + let num = value.i32().ok_or(DatabaseError::NotNull)?; Ok(Box::new((0..num).map(|i| { Ok(Tuple { diff --git a/src/lib.rs b/src/lib.rs index f0d320c8..dc316679 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,7 +92,6 @@ #![feature(coroutine_trait)] #![feature(iterator_try_collect)] #![feature(slice_pattern)] -#![feature(is_sorted)] #![feature(stmt_expr_attributes)] extern crate core; diff --git a/src/optimizer/core/memo.rs b/src/optimizer/core/memo.rs index b6ba9715..6b7bebb0 100644 --- a/src/optimizer/core/memo.rs +++ b/src/optimizer/core/memo.rs @@ -118,7 +118,7 @@ mod tests { let transaction = database.storage.transaction()?; let c1_column_id = { transaction - .table(&database.table_cache, Arc::new("t1".to_string())) + .table(&database.table_cache, Arc::new("t1".to_string()))? .unwrap() .get_column_id_by_name("c1") .unwrap() @@ -128,6 +128,7 @@ mod tests { let mut binder = Binder::new( BinderContext::new( &database.table_cache, + &database.view_cache, &transaction, &scala_functions, &table_functions, diff --git a/src/optimizer/rule/normalization/pushdown_limit.rs b/src/optimizer/rule/normalization/pushdown_limit.rs index 3d804160..9a7b3e6e 100644 --- a/src/optimizer/rule/normalization/pushdown_limit.rs +++ b/src/optimizer/rule/normalization/pushdown_limit.rs @@ -67,7 +67,7 @@ impl NormalizationRule for LimitProjectTranspose { /// Add extra limits below JOIN: /// 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides, -/// respectively. +/// respectively. /// /// TODO: 2. For INNER and CROSS JOIN, we push limits to both the left and right sides /// TODO: if join condition is empty. diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 15ce0e55..805f7229 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -5,11 +5,17 @@ use crate::planner::operator::join::JoinType; use crate::planner::operator::union::UnionOperator; use crate::planner::operator::values::ValuesOperator; use crate::planner::operator::{Operator, PhysicalOption}; -use crate::types::tuple::SchemaRef; +use crate::types::tuple::{Schema, SchemaRef}; use itertools::Itertools; use serde_macros::ReferenceSerialization; use std::sync::Arc; +#[derive(Debug, Clone)] +pub(crate) enum SchemaOutput { + Schema(Schema), + SchemaRef(SchemaRef), +} + #[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] pub struct LogicalPlan { pub(crate) operator: Operator, @@ -19,6 +25,15 @@ pub struct LogicalPlan { pub(crate) _output_schema_ref: Option, } +impl SchemaOutput { + pub(crate) fn columns(&self) -> impl Iterator { + match self { + SchemaOutput::Schema(schema) => schema.iter(), + SchemaOutput::SchemaRef(schema_ref) => schema_ref.iter(), + } + } +} + impl LogicalPlan { pub fn new(operator: Operator, childrens: Vec) -> Self { Self { @@ -48,108 +63,120 @@ impl LogicalPlan { tables } - pub fn output_schema(&mut self) -> &SchemaRef { - self._output_schema_ref - .get_or_insert_with(|| match &self.operator { - Operator::Filter(_) | Operator::Sort(_) | Operator::Limit(_) => { - self.childrens[0].output_schema().clone() - } - Operator::Aggregate(op) => { - let out_columns = op - .agg_calls - .iter() - .chain(op.groupby_exprs.iter()) - .map(|expr| expr.output_column()) - .collect_vec(); - Arc::new(out_columns) + pub(crate) fn _output_schema_direct( + operator: &Operator, + childrens: &[LogicalPlan], + ) -> SchemaOutput { + match operator { + Operator::Filter(_) | Operator::Sort(_) | Operator::Limit(_) => { + childrens[0].output_schema_direct() + } + Operator::Aggregate(op) => SchemaOutput::Schema( + op.agg_calls + .iter() + .chain(op.groupby_exprs.iter()) + .map(|expr| expr.output_column()) + .collect_vec(), + ), + Operator::Join(op) => { + if matches!(op.join_type, JoinType::LeftSemi | JoinType::LeftAnti) { + return childrens[0].output_schema_direct(); } - Operator::Join(op) => { - if matches!(op.join_type, JoinType::LeftSemi | JoinType::LeftAnti) { - return self.childrens[0].output_schema().clone(); + let mut columns = Vec::new(); + + for plan in childrens.iter() { + for column in plan.output_schema_direct().columns() { + columns.push(column.clone()); } - let out_columns = self - .childrens - .iter_mut() - .flat_map(|children| Vec::clone(children.output_schema())) - .collect_vec(); - Arc::new(out_columns) - } - Operator::Project(op) => { - let out_columns = op - .exprs - .iter() - .map(|expr| expr.output_column()) - .collect_vec(); - Arc::new(out_columns) } - Operator::TableScan(op) => { - let out_columns = op - .columns - .iter() - .map(|(_, column)| column.clone()) - .collect_vec(); - Arc::new(out_columns) - } - // FIXME: redundant clone - Operator::FunctionScan(op) => op.table_function.output_schema().clone(), - Operator::Values(ValuesOperator { schema_ref, .. }) - | Operator::Union(UnionOperator { - left_schema_ref: schema_ref, - .. - }) => schema_ref.clone(), - Operator::Dummy => Arc::new(vec![]), - Operator::Show => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "TABLE".to_string(), - ))]), - Operator::Explain => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "PLAN".to_string(), - ))]), - Operator::Describe(_) => Arc::new(vec![ - ColumnRef::from(ColumnCatalog::new_dummy("FIELD".to_string())), - ColumnRef::from(ColumnCatalog::new_dummy("TYPE".to_string())), - ColumnRef::from(ColumnCatalog::new_dummy("LEN".to_string())), - ColumnRef::from(ColumnCatalog::new_dummy("NULL".to_string())), - ColumnRef::from(ColumnCatalog::new_dummy("Key".to_string())), - ColumnRef::from(ColumnCatalog::new_dummy("DEFAULT".to_string())), - ]), - Operator::Insert(_) => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "INSERTED".to_string(), - ))]), - Operator::Update(_) => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "UPDATED".to_string(), - ))]), - Operator::Delete(_) => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "DELETED".to_string(), - ))]), - Operator::Analyze(_) => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "STATISTICS_META_PATH".to_string(), - ))]), - Operator::AddColumn(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("ADD COLUMN SUCCESS".to_string()), - )]), - Operator::DropColumn(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("DROP COLUMN SUCCESS".to_string()), - )]), - Operator::CreateTable(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("CREATE TABLE SUCCESS".to_string()), - )]), - Operator::CreateIndex(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("CREATE INDEX SUCCESS".to_string()), - )]), - Operator::CreateView(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("CREATE VIEW SUCCESS".to_string()), - )]), - Operator::DropTable(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("DROP TABLE SUCCESS".to_string()), - )]), - Operator::Truncate(_) => Arc::new(vec![ColumnRef::from(ColumnCatalog::new_dummy( - "TRUNCATE TABLE SUCCESS".to_string(), - ))]), - Operator::CopyFromFile(_) => Arc::new(vec![ColumnRef::from( - ColumnCatalog::new_dummy("COPY FROM SOURCE".to_string()), - )]), - Operator::CopyToFile(_) => todo!(), - }) + SchemaOutput::Schema(columns) + } + Operator::Project(op) => SchemaOutput::Schema( + op.exprs + .iter() + .map(|expr| expr.output_column()) + .collect_vec(), + ), + Operator::TableScan(op) => SchemaOutput::Schema( + op.columns + .iter() + .map(|(_, column)| column.clone()) + .collect_vec(), + ), + Operator::FunctionScan(op) => { + SchemaOutput::SchemaRef(op.table_function.output_schema().clone()) + } + Operator::Values(ValuesOperator { schema_ref, .. }) + | Operator::Union(UnionOperator { + left_schema_ref: schema_ref, + .. + }) => SchemaOutput::SchemaRef(schema_ref.clone()), + Operator::Dummy => SchemaOutput::Schema(vec![]), + Operator::Show => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("TABLE".to_string()), + )]), + Operator::Explain => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("PLAN".to_string()), + )]), + Operator::Describe(_) => SchemaOutput::Schema(vec![ + ColumnRef::from(ColumnCatalog::new_dummy("FIELD".to_string())), + ColumnRef::from(ColumnCatalog::new_dummy("TYPE".to_string())), + ColumnRef::from(ColumnCatalog::new_dummy("LEN".to_string())), + ColumnRef::from(ColumnCatalog::new_dummy("NULL".to_string())), + ColumnRef::from(ColumnCatalog::new_dummy("Key".to_string())), + ColumnRef::from(ColumnCatalog::new_dummy("DEFAULT".to_string())), + ]), + Operator::Insert(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("INSERTED".to_string()), + )]), + Operator::Update(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("UPDATED".to_string()), + )]), + Operator::Delete(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("DELETED".to_string()), + )]), + Operator::Analyze(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("STATISTICS_META_PATH".to_string()), + )]), + Operator::AddColumn(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("ADD COLUMN SUCCESS".to_string()), + )]), + Operator::DropColumn(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("DROP COLUMN SUCCESS".to_string()), + )]), + Operator::CreateTable(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("CREATE TABLE SUCCESS".to_string()), + )]), + Operator::CreateIndex(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("CREATE INDEX SUCCESS".to_string()), + )]), + Operator::CreateView(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("CREATE VIEW SUCCESS".to_string()), + )]), + Operator::DropTable(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("DROP TABLE SUCCESS".to_string()), + )]), + Operator::Truncate(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("TRUNCATE TABLE SUCCESS".to_string()), + )]), + Operator::CopyFromFile(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("COPY FROM SOURCE".to_string()), + )]), + Operator::CopyToFile(_) => todo!(), + } + } + + pub(crate) fn output_schema_direct(&self) -> SchemaOutput { + Self::_output_schema_direct(&self.operator, &self.childrens) + } + + pub fn output_schema(&mut self) -> &SchemaRef { + self._output_schema_ref.get_or_insert_with(|| { + match Self::_output_schema_direct(&self.operator, &self.childrens) { + SchemaOutput::Schema(schema) => Arc::new(schema), + SchemaOutput::SchemaRef(schema_ref) => schema_ref.clone(), + } + }) } pub fn explain(&self, indentation: usize) -> String { diff --git a/src/planner/operator/drop_table.rs b/src/planner/operator/drop_table.rs index e3c5b212..6a8e0519 100644 --- a/src/planner/operator/drop_table.rs +++ b/src/planner/operator/drop_table.rs @@ -12,7 +12,11 @@ pub struct DropTableOperator { impl fmt::Display for DropTableOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "Drop {}, If Exists: {}", self.table_name, self.if_exists)?; + write!( + f, + "Drop Table {}, If Exists: {}", + self.table_name, self.if_exists + )?; Ok(()) } diff --git a/src/serdes/column.rs b/src/serdes/column.rs index a3211422..8c85779d 100644 --- a/src/serdes/column.rs +++ b/src/serdes/column.rs @@ -50,7 +50,7 @@ impl ReferenceSerialization for ColumnRef { ) = (&summary.relation, drive) { let table = transaction - .table(table_cache, table_name.clone()) + .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; let column = table .get_column_by_id(column_id) @@ -174,7 +174,7 @@ pub(crate) mod test { let mut reference_tables = ReferenceTables::new(); let c3_column_id = { let table = transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap(); *table.get_column_id_by_name("c3").unwrap() }; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bf5c9bc2..f271e6fe 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -56,7 +56,7 @@ pub trait Transaction: Sized { debug_assert!(columns.iter().map(|(i, _)| i).all_unique()); let table = self - .table(table_cache, table_name.clone()) + .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; let table_types = table.types(); if columns.is_empty() { @@ -96,7 +96,7 @@ pub trait Transaction: Sized { debug_assert!(columns.iter().map(|(i, _)| i).all_unique()); let table = self - .table(table_cache, table_name.clone()) + .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; let table_types = table.types(); let table_name = table.name.as_str(); @@ -135,7 +135,7 @@ pub trait Transaction: Sized { column_ids: Vec, ty: IndexType, ) -> Result { - if let Some(mut table) = self.table(table_cache, table_name.clone()).cloned() { + if let Some(mut table) = self.table(table_cache, table_name.clone())?.cloned() { let index_meta = table.add_index_meta(index_name, column_ids, ty)?; let (key, value) = TableCodec::encode_index_meta(table_name, index_meta)?; self.set(key, value)?; @@ -217,7 +217,7 @@ pub trait Transaction: Sized { column: &ColumnCatalog, if_not_exists: bool, ) -> Result { - if let Some(mut table) = self.table(table_cache, table_name.clone()).cloned() { + if let Some(mut table) = self.table(table_cache, table_name.clone())?.cloned() { if !column.nullable() && column.default_value()?.is_none() { return Err(DatabaseError::NeedNullAbleOrDefault); } @@ -262,7 +262,7 @@ pub trait Transaction: Sized { table_name: &TableName, column_name: &str, ) -> Result<(), DatabaseError> { - if let Some(table_catalog) = self.table(table_cache, table_name.clone()).cloned() { + if let Some(table_catalog) = self.table(table_cache, table_name.clone())?.cloned() { let column = table_catalog.get_column_by_name(column_name).unwrap(); let (key, _) = TableCodec::encode_column(column, &mut ReferenceTables::new())?; @@ -345,7 +345,7 @@ pub trait Transaction: Sized { table_name: TableName, if_exists: bool, ) -> Result<(), DatabaseError> { - if self.table(table_cache, table_name.clone()).is_none() { + if self.table(table_cache, table_name.clone())?.is_none() { if if_exists { return Ok(()); } else { @@ -384,30 +384,35 @@ pub trait Transaction: Sized { view_cache: &'a ViewCache, view_name: TableName, drive: (&Self, &TableCache), - ) -> Option<&'a View> { - view_cache - .get_or_insert(view_name.clone(), |_| { - let bytes = self - .get(&TableCodec::encode_view_key(&view_name))? - .ok_or(DatabaseError::ViewExists)?; - TableCodec::decode_view(&bytes, drive) - }) - .ok() + ) -> Result, DatabaseError> { + if let Some(view) = view_cache.get(&view_name) { + return Ok(Some(view)); + } + let Some(bytes) = self.get(&TableCodec::encode_view_key(&view_name))? else { + return Ok(None); + }; + Ok(Some(view_cache.get_or_insert(view_name.clone(), |_| { + TableCodec::decode_view(&bytes, drive) + })?)) } fn table<'a>( &'a self, table_cache: &'a TableCache, table_name: TableName, - ) -> Option<&TableCatalog> { - table_cache - .get_or_insert(table_name.clone(), |_| { - // `TableCache` is not theoretically used in `table_collect` because ColumnCatalog should not depend on other Column - let (columns, indexes) = self.table_collect(table_name.clone())?; + ) -> Result, DatabaseError> { + if let Some(table) = table_cache.get(&table_name) { + return Ok(Some(table)); + } - TableCatalog::reload(table_name.clone(), columns, indexes) + // `TableCache` is not theoretically used in `table_collect` because ColumnCatalog should not depend on other Column + self.table_collect(&table_name)? + .map(|(columns, indexes)| { + table_cache.get_or_insert(table_name.clone(), |_| { + TableCatalog::reload(table_name, columns, indexes) + }) }) - .ok() + .transpose() } fn table_metas(&self) -> Result, DatabaseError> { @@ -465,25 +470,29 @@ pub trait Transaction: Sized { Ok(()) } - fn meta_loader<'a>(&'a self, meta_cache: &'a StatisticsMetaCache) -> StatisticMetaLoader + fn meta_loader<'a>( + &'a self, + meta_cache: &'a StatisticsMetaCache, + ) -> StatisticMetaLoader<'a, Self> where Self: Sized, { StatisticMetaLoader::new(self, meta_cache) } + #[allow(clippy::type_complexity)] fn table_collect( &self, - table_name: TableName, - ) -> Result<(Vec, Vec), DatabaseError> { - let (table_min, table_max) = TableCodec::table_bound(&table_name); + table_name: &TableName, + ) -> Result, Vec)>, DatabaseError> { + let (table_min, table_max) = TableCodec::table_bound(table_name); let mut column_iter = self.range(Bound::Included(&table_min), Bound::Included(&table_max))?; let mut columns = Vec::new(); let mut index_metas = Vec::new(); let mut reference_tables = ReferenceTables::new(); - let _ = reference_tables.push_or_replace(&table_name); + let _ = reference_tables.push_or_replace(table_name); // Tips: only `Column`, `IndexMeta`, `TableMeta` while let Some((key, value)) = column_iter.try_next().ok().flatten() { @@ -498,7 +507,7 @@ pub trait Transaction: Sized { } } - Ok((columns, index_metas)) + Ok((!columns.is_empty()).then_some((columns, index_metas))) } fn _drop_data(&mut self, min: &[u8], max: &[u8]) -> Result<(), DatabaseError> { @@ -1121,7 +1130,7 @@ mod test { table_cache: &TableCache| -> Result<(), DatabaseError> { let table = transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap(); let c1_column_id = *table.get_column_id_by_name("c1").unwrap(); let c2_column_id = *table.get_column_id_by_name("c2").unwrap(); @@ -1288,7 +1297,7 @@ mod test { build_table(&table_cache, &mut transaction)?; let (c2_column_id, c3_column_id) = { let t1_table = transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap(); ( @@ -1316,7 +1325,7 @@ mod test { table_cache: &TableCache| -> Result<(), DatabaseError> { let table = transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap(); let i1_meta = table.indexes[1].clone(); @@ -1392,7 +1401,7 @@ mod test { build_table(&table_cache, &mut transaction)?; let t1_table = transaction - .table(&table_cache, Arc::new("t1".to_string())) + .table(&table_cache, Arc::new("t1".to_string()))? .unwrap(); let c3_column_id = *t1_table.get_column_id_by_name("c3").unwrap(); @@ -1499,7 +1508,9 @@ mod test { ); } { - let table = transaction.table(&table_cache, table_name.clone()).unwrap(); + let table = transaction + .table(&table_cache, table_name.clone())? + .unwrap(); assert!(table.contains_column("c4")); let mut new_column = ColumnCatalog::new( @@ -1519,7 +1530,9 @@ mod test { } transaction.drop_column(&table_cache, &meta_cache, &table_name, "c4")?; { - let table = transaction.table(&table_cache, table_name.clone()).unwrap(); + let table = transaction + .table(&table_cache, table_name.clone())? + .unwrap(); assert!(!table.contains_column("c4")); assert!(table.get_column_by_name("c4").is_none()); } @@ -1530,7 +1543,6 @@ mod test { #[test] fn test_view_create_drop() -> Result<(), DatabaseError> { let table_state = build_t1_table()?; - let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?); let view_name = Arc::new("v1".to_string()); let view = View { @@ -1540,29 +1552,29 @@ mod test { ), }; let mut transaction = table_state.storage.transaction()?; - transaction.create_view(&view_cache, view.clone(), true)?; + transaction.create_view(&table_state.view_cache, view.clone(), true)?; assert_eq!( &view, transaction .view( - &view_cache, + &table_state.view_cache, view_name.clone(), (&transaction, &table_state.table_cache) - ) + )? .unwrap() ); assert_eq!( &view, transaction .view( - &view_cache, + &table_state.view_cache, view_name.clone(), ( &transaction, &Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?) ) - ) + )? .unwrap() ); diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index e4bc9e6d..5e436aa7 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -180,7 +180,7 @@ mod test { false, )?; - let table_catalog = transaction.table(&table_cache, Arc::new("test".to_string())); + let table_catalog = transaction.table(&table_cache, Arc::new("test".to_string()))?; debug_assert!(table_catalog.is_some()); debug_assert!(table_catalog .unwrap() @@ -242,7 +242,7 @@ mod test { let table_name = Arc::new("t1".to_string()); let table = transaction - .table(&fnck_sql.table_cache, table_name.clone()) + .table(&fnck_sql.table_cache, table_name.clone())? .unwrap() .clone(); let a_column_id = table.get_column_id_by_name("a").unwrap(); @@ -300,7 +300,7 @@ mod test { let transaction = fnck_sql.storage.transaction().unwrap(); let table = transaction - .table(&fnck_sql.table_cache, Arc::new("t1".to_string())) + .table(&fnck_sql.table_cache, Arc::new("t1".to_string()))? .unwrap() .clone(); let columns = table.columns().cloned().enumerate().collect_vec(); diff --git a/src/types/value.rs b/src/types/value.rs index 7228b604..01d727b2 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -240,11 +240,7 @@ impl Hash for DataValue { Date64(v) => v.hash(state), Time(v) => v.hash(state), Decimal(v) => v.hash(state), - Tuple(values) => { - for v in values { - v.hash(state) - } - } + Tuple(values) => values.hash(state), } } } diff --git a/src/utils/lru.rs b/src/utils/lru.rs index 97cd073f..e22d0d03 100644 --- a/src/utils/lru.rs +++ b/src/utils/lru.rs @@ -256,6 +256,7 @@ impl LruCache { } #[inline] + #[allow(clippy::manual_inspect)] pub fn put(&mut self, key: K, value: V) -> Option { let node = NodeReadPtr(Box::leak(Box::new(Node::new(key, value))).into()); let old_node = self.inner.remove(&KeyRef(node)).map(|node| { @@ -323,9 +324,8 @@ impl LruCache { } else { let value = fn_once(&key)?; let node = NodeReadPtr(Box::leak(Box::new(Node::new(key, value))).into()); - let _ignore = self.inner.remove(&KeyRef(node)).map(|node| { + self.inner.remove(&KeyRef(node)).inspect(|&node| { self.detach(node); - node }); self.expulsion(); self.attach(node);