From 07c40ea0c3df9c6f8d9f2b51b181296c9c207080 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 17 Feb 2024 14:21:43 +0800 Subject: [PATCH 1/2] refactor: reconstructed HashJoin execution operator implementation --- src/execution/volcano/dql/join/hash_join.rs | 312 +++++++++----------- 1 file changed, 135 insertions(+), 177 deletions(-) diff --git a/src/execution/volcano/dql/join/hash_join.rs b/src/execution/volcano/dql/join/hash_join.rs index 58d51339..e4004277 100644 --- a/src/execution/volcano/dql/join/hash_join.rs +++ b/src/execution/volcano/dql/join/hash_join.rs @@ -6,12 +6,13 @@ use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; use crate::storage::Transaction; -use crate::types::tuple::Tuple; -use crate::types::value::{DataValue, ValueRef}; -use ahash::{HashMap, HashSet, HashSetExt, RandomState}; +use crate::types::tuple::{SchemaRef, Tuple}; +use crate::types::value::{DataValue, ValueRef, NULL_VALUE}; +use ahash::HashMap; +use futures::stream::BoxStream; +use futures::{stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; -use std::mem; use std::sync::Arc; pub struct HashJoin { @@ -47,22 +48,21 @@ impl ReadExecutor for HashJoin { pub(crate) struct HashJoinStatus { ty: JoinType, filter: Option, + build_map: HashMap, (Vec, bool)>, - join_columns: Vec, - used_set: HashSet, - build_map: HashMap>, - hash_random_state: RandomState, - - left_init_flag: bool, - left_force_nullable: bool, + full_schema_ref: SchemaRef, + left_schema_len: usize, on_left_keys: Vec, - right_init_flag: bool, - right_force_nullable: bool, on_right_keys: Vec, } impl HashJoinStatus { - pub(crate) fn new(on: JoinCondition, ty: JoinType) -> Self { + pub(crate) fn new( + on: JoinCondition, + ty: JoinType, + left_schema: &SchemaRef, + right_schema: &SchemaRef, + ) -> Self { if ty == JoinType::Cross { unreachable!("Cross join should not be in HashJoinExecutor"); } @@ -73,22 +73,33 @@ impl HashJoinStatus { JoinCondition::On { on, filter } => (on.into_iter().unzip(), filter), JoinCondition::None => unreachable!("HashJoin must has on condition"), }; + + let fn_process = |schema: &mut Vec, force_nullable| { + for column in schema.iter_mut() { + let mut temp = ColumnCatalog::clone(column); + temp.nullable = force_nullable; + + *column = Arc::new(temp); + } + }; let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); + let left_schema_len = left_schema.len(); + + let mut join_schema = Vec::clone(left_schema); + fn_process(&mut join_schema, left_force_nullable); + let mut right_schema = Vec::clone(right_schema); + fn_process(&mut right_schema, right_force_nullable); + + join_schema.append(&mut right_schema); HashJoinStatus { ty, filter, - - join_columns: vec![], - used_set: HashSet::new(), build_map: Default::default(), - hash_random_state: RandomState::with_seeds(0, 0, 0, 0), - left_init_flag: false, - left_force_nullable, + full_schema_ref: Arc::new(join_schema), + left_schema_len, on_left_keys, - right_init_flag: false, - right_force_nullable, on_right_keys, } } @@ -96,209 +107,148 @@ impl HashJoinStatus { pub(crate) fn left_build(&mut self, tuple: Tuple) -> Result<(), DatabaseError> { let HashJoinStatus { on_left_keys, - hash_random_state, - left_init_flag, - join_columns, - left_force_nullable, build_map, .. } = self; + let values = Self::eval_keys(on_left_keys, &tuple)?; - let hash = Self::hash_row(on_left_keys, hash_random_state, &tuple)?; - - if !*left_init_flag { - Self::columns_filling(&tuple, join_columns, *left_force_nullable); - let _ = mem::replace(left_init_flag, true); - } - - build_map.entry(hash).or_insert_with(Vec::new).push(tuple); + build_map + .entry(values) + .or_insert_with(|| (Vec::new(), false)) + .0 + .push(tuple); Ok(()) } pub(crate) fn right_probe(&mut self, tuple: Tuple) -> Result, DatabaseError> { let HashJoinStatus { - hash_random_state, - join_columns, on_right_keys, - right_init_flag, - right_force_nullable, + full_schema_ref, build_map, - used_set, ty, filter, + left_schema_len, .. } = self; let right_cols_len = tuple.schema_ref.len(); - let hash = Self::hash_row(on_right_keys, hash_random_state, &tuple)?; - - if !*right_init_flag { - Self::columns_filling(&tuple, join_columns, *right_force_nullable); - let _ = mem::replace(right_init_flag, true); - } - let join_columns = Arc::new(join_columns.clone()); - - let mut join_tuples = if let Some(tuples) = build_map.get(&hash) { - let _ = used_set.insert(hash); - - tuples - .iter() - .map(|Tuple { values, .. }| { - let full_values = values - .iter() - .cloned() - .chain(tuple.values.clone()) - .collect_vec(); - - Tuple { - id: None, - schema_ref: join_columns.clone(), - values: full_values, - } - }) - .collect_vec() + let values = Self::eval_keys(on_right_keys, &tuple)?; + + let mut join_tuples = Vec::with_capacity(1); + if let Some((tuples, is_used)) = build_map.get_mut(&values) { + *is_used = true; + join_tuples.reserve(tuples.len()); + + for Tuple { values, .. } in tuples { + let full_values = values + .iter() + .cloned() + .chain(tuple.values.clone()) + .collect_vec(); + let tuple = Tuple { + id: None, + schema_ref: full_schema_ref.clone(), + values: full_values, + }; + if let Some(tuple) = Self::filter(tuple, filter, ty, *left_schema_len)? { + join_tuples.push(tuple); + } + } } else if matches!(ty, JoinType::Right | JoinType::Full) { - let empty_len = join_columns.len() - right_cols_len; - let values = join_columns[..empty_len] - .iter() - .map(|col| Arc::new(DataValue::none(col.datatype()))) + let empty_len = full_schema_ref.len() - right_cols_len; + let values = (0..empty_len) + .map(|_| NULL_VALUE.clone()) .chain(tuple.values) .collect_vec(); - - vec![Tuple { + let tuple = Tuple { id: None, - schema_ref: join_columns.clone(), + schema_ref: full_schema_ref.clone(), values, - }] - } else { - vec![] - }; + }; + if let Some(tuple) = Self::filter(tuple, filter, ty, *left_schema_len)? { + join_tuples.push(tuple); + } + } - // on filter - if let (Some(expr), false) = ( - &filter, - join_tuples.is_empty() || matches!(ty, JoinType::Full | JoinType::Cross), - ) { - let mut filter_tuples = Vec::new(); - - for mut tuple in join_tuples { - if let DataValue::Boolean(option) = expr.eval(&tuple)?.as_ref() { - if let Some(false) | None = option { - let full_cols_len = tuple.schema_ref.len(); - let left_cols_len = full_cols_len - right_cols_len; - - match ty { - JoinType::Left => { - for i in left_cols_len..full_cols_len { - let value_type = tuple.schema_ref[i].datatype(); - - tuple.values[i] = Arc::new(DataValue::none(value_type)) - } - filter_tuples.push(tuple) - } - JoinType::Right => { - for i in 0..left_cols_len { - let value_type = tuple.schema_ref[i].datatype(); + Ok(join_tuples) + } - tuple.values[i] = Arc::new(DataValue::none(value_type)) - } - filter_tuples.push(tuple) + pub(crate) fn filter( + mut tuple: Tuple, + filter: &Option, + join_ty: &JoinType, + left_schema_len: usize, + ) -> Result, DatabaseError> { + if let (Some(expr), false) = (filter, matches!(join_ty, JoinType::Full | JoinType::Cross)) { + match expr.eval(&tuple)?.as_ref() { + DataValue::Boolean(Some(false) | None) => { + let full_schema_len = tuple.schema_ref.len(); + + match join_ty { + JoinType::Left => { + for i in left_schema_len..full_schema_len { + tuple.values[i] = NULL_VALUE.clone(); + } + } + JoinType::Right => { + for i in 0..left_schema_len { + tuple.values[i] = NULL_VALUE.clone(); } - _ => (), } - } else { - filter_tuples.push(tuple) + _ => return Ok(None), } - } else { - unreachable!("only bool"); } + DataValue::Boolean(Some(true)) => (), + _ => return Err(DatabaseError::InvalidType), } - - join_tuples = filter_tuples; } - Ok(join_tuples) + Ok(Some(tuple)) } - pub(crate) fn build_drop(&mut self) -> Vec { + pub(crate) fn build_drop(&mut self) -> Option> { let HashJoinStatus { - join_columns, + full_schema_ref, build_map, - used_set, ty, .. } = self; - matches!(ty, JoinType::Left | JoinType::Full) - .then(|| { - let mut buf = None; - + matches!(ty, JoinType::Left | JoinType::Full).then(|| { + stream::iter( build_map .drain() - .filter(|(hash, _)| !used_set.contains(hash)) - .flat_map(|(_, mut tuples)| { - for Tuple { - values, - schema_ref: columns, - id, - } in tuples.iter_mut() - { - let _ = id.take(); - let (right_values, full_columns) = buf.get_or_insert_with(|| { - let (right_values, mut right_columns): ( - Vec, - Vec, - ) = join_columns[columns.len()..] - .iter() - .map(|col| { - (Arc::new(DataValue::none(col.datatype())), col.clone()) - }) - .unzip(); - let mut full_columns = Vec::clone(columns); - full_columns.append(&mut right_columns); - - (right_values, Arc::new(full_columns)) - }); - - values.append(&mut right_values.clone()); - *columns = full_columns.clone(); + .filter_map(|(_, (mut left_tuples, is_used))| { + if !is_used { + for tuple in left_tuples.iter_mut() { + tuple.schema_ref = full_schema_ref.clone(); + + while tuple.values.len() != full_schema_ref.len() { + tuple.values.push(NULL_VALUE.clone()); + } + } + return Some(left_tuples); } - tuples + None }) - .collect_vec() - }) - .unwrap_or_else(Vec::new) + .flatten(), + ) + .boxed() + }) } - fn columns_filling(tuple: &Tuple, join_columns: &mut Vec, force_nullable: bool) { - let mut new_columns = tuple - .schema_ref - .iter() - .cloned() - .map(|col| { - let mut new_catalog = ColumnCatalog::clone(&col); - new_catalog.nullable = force_nullable; - - Arc::new(new_catalog) - }) - .collect_vec(); - - join_columns.append(&mut new_columns); - } - - fn hash_row( + fn eval_keys( on_keys: &[ScalarExpression], - hash_random_state: &RandomState, tuple: &Tuple, - ) -> Result { + ) -> Result, DatabaseError> { let mut values = Vec::with_capacity(on_keys.len()); for expr in on_keys { values.push(expr.eval(tuple)?); } - Ok(hash_random_state.hash_one(values)) + Ok(values) } } @@ -308,11 +258,16 @@ impl HashJoin { let HashJoin { on, ty, - left_input, - right_input, + mut left_input, + mut right_input, } = self; - let mut join_status = HashJoinStatus::new(on, ty); + let mut join_status = HashJoinStatus::new( + on, + ty, + left_input.output_schema(), + right_input.output_schema(), + ); // build phase: // 1.construct hashtable, one hash key may contains multiple rows indices. @@ -334,9 +289,12 @@ impl HashJoin { } } - for tuple in join_status.build_drop() { - yield tuple - } + if let Some(stream) = join_status.build_drop() { + #[for_await] + for tuple in stream { + yield tuple + } + }; } } From 8ad71df3b5d179a6567e31523264841905fadc35 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 17 Feb 2024 16:52:43 +0800 Subject: [PATCH 2/2] feat: support `Select Into` --- src/binder/insert.rs | 2 +- src/binder/select.rs | 23 +++++++++++++++++++- src/errors.rs | 2 -- tests/slt/basic_test.slt | 18 ++++++++-------- tests/slt/group_by.slt | 1 - tests/slt/having.slt | 9 ++++---- tests/slt/select_into.slt | 44 +++++++++++++++++++++++++++++++++++++++ tests/slt/where.slt | 11 +++++----- 8 files changed, 85 insertions(+), 25 deletions(-) create mode 100644 tests/slt/select_into.slt diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 43f49ff9..4231bbc8 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -55,7 +55,7 @@ impl<'a, T: Transaction> Binder<'a, T> { let mut rows = Vec::with_capacity(expr_rows.len()); for expr_row in expr_rows { if expr_row.len() != values_len { - return Err(DatabaseError::ValuesLenNotSame()); + return Err(DatabaseError::ValuesLenMismatch(expr_row.len(), values_len)); } let mut row = Vec::with_capacity(expr_row.len()); diff --git a/src/binder/select.rs b/src/binder/select.rs index 477a7651..bfb6e887 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -20,6 +20,7 @@ use crate::catalog::{ColumnCatalog, ColumnSummary, TableName}; use crate::errors::DatabaseError; use crate::execution::volcano::dql::join::joins_nullable; use crate::expression::{AliasType, BinaryOperator}; +use crate::planner::operator::insert::InsertOperator; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::operator::union::UnionOperator; @@ -30,7 +31,8 @@ use crate::types::LogicalType; use itertools::Itertools; use sqlparser::ast::{ Distinct, Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select, - SelectItem, SetExpr, SetOperator, SetQuantifier, TableAlias, TableFactor, TableWithJoins, + SelectInto, SelectItem, SetExpr, SetOperator, SetQuantifier, TableAlias, TableFactor, + TableWithJoins, }; impl<'a, T: Transaction> Binder<'a, T> { @@ -111,6 +113,25 @@ impl<'a, T: Transaction> Binder<'a, T> { plan = self.bind_project(plan, select_list)?; + if let Some(SelectInto { + name, + unlogged, + temporary, + .. + }) = &select.into + { + if *unlogged || *temporary { + todo!() + } + plan = LogicalPlan::new( + Operator::Insert(InsertOperator { + table_name: Arc::new(lower_case_name(name)?), + is_overwrite: false, + }), + vec![plan], + ) + } + Ok(plan) } diff --git a/src/errors.rs b/src/errors.rs index afda9b63..e6dad3fe 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -152,8 +152,6 @@ pub enum DatabaseError { AmbiguousColumn(String), #[error("values length not match, expect {0}, got {1}")] ValuesLenMismatch(usize, usize), - #[error("values list must all be the same length")] - ValuesLenNotSame(), #[error("binary operator types mismatch: {0} != {1}")] BinaryOpTypeMismatch(String, String), #[error("subquery error: {0}")] diff --git a/tests/slt/basic_test.slt b/tests/slt/basic_test.slt index 683ea9bd..34fff868 100644 --- a/tests/slt/basic_test.slt +++ b/tests/slt/basic_test.slt @@ -3,15 +3,15 @@ select 1 ---- 1 -# query R -# select 10000.00::FLOAT + 234.567::FLOAT -# ---- -# 10234.567 - -# query R -# select 100.0::DOUBLE/8.0::DOUBLE -# ---- -# 12.5 +query R +select 10000.00::FLOAT + 234.567::FLOAT +---- +10234.567 + +query R +select 100.0::DOUBLE/8.0::DOUBLE +---- +12.5 query B select 2>1 diff --git a/tests/slt/group_by.slt b/tests/slt/group_by.slt index f022ba56..6caec916 100644 --- a/tests/slt/group_by.slt +++ b/tests/slt/group_by.slt @@ -4,7 +4,6 @@ create table t (id int primary key, v1 int, v2 int) statement ok insert into t values (0,1,1), (1,2,1), (2,3,2), (3,4,2), (4,5,3) -# TODO: check on binder statement error select v2 + 1, v1 from t group by v2 + 1 diff --git a/tests/slt/having.slt b/tests/slt/having.slt index 89d25713..5454867c 100644 --- a/tests/slt/having.slt +++ b/tests/slt/having.slt @@ -19,11 +19,10 @@ select count(x) as a, y + 1 as b from test group by b having b + 1 = 24; ---- 1 23 -# TODO: Filter pushed down to Agg -# query II -# select x from test group by x having max(y) = 22 -# ---- -# 11 +query II +select x from test group by x having max(y) = 22 +---- +11 # query II # select y + 1 as i from test group by y + 1 having count(x) > 1 and y + 1 = 3 or y + 1 = 23 order by i; diff --git a/tests/slt/select_into.slt b/tests/slt/select_into.slt new file mode 100644 index 00000000..d2b9035c --- /dev/null +++ b/tests/slt/select_into.slt @@ -0,0 +1,44 @@ +statement ok +create table t1 (v1 int not null primary key, v2 int not null); + +statement ok +create table t2 (v1 int not null primary key, v2 int not null); + +statement ok +create table t3 (v1 int not null primary key, v2 int not null); + +statement ok +insert into t1 values (1, 1), (4, 6), (3, 2), (2, 1); + +query II rowsort +select * from t1; +---- +1 1 +2 1 +3 2 +4 6 + +statement ok +select * into t2 from t1; + +statement ok +select v2, v1 into t3 from t2 where v2 != 6; + +query II rowsort +select * from t2; +---- +1 1 +2 1 +3 2 +4 6 + +query II rowsort +select * from t3; +---- +1 1 +2 1 +3 2 + +statement ok +drop table t1; + diff --git a/tests/slt/where.slt b/tests/slt/where.slt index cc6ab478..ef88561a 100644 --- a/tests/slt/where.slt +++ b/tests/slt/where.slt @@ -69,12 +69,11 @@ create table t(id int primary key, v1 int null, v2 int); statement ok insert into t values (0, 1, 1), (1, null, 2), (2, null, 3), (3, 4, 4); -# TODO: Support `IsNull` -# query I -# select v2 from t where v1 is null; -# ---- -# 2 -# 3 +query I +select v2 from t where v1 is null; +---- +2 +3 # query I # select v2 from t where v1 is not null;