From dd1268f5716de1a2118b32122c791fe84d29593b Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 7 Oct 2023 15:37:07 +0800 Subject: [PATCH] style: `inputs` restore --- src/execution/executor/ddl/create_table.rs | 2 +- src/execution/executor/ddl/drop_table.rs | 2 +- src/execution/executor/ddl/truncate.rs | 2 +- src/execution/executor/dml/copy_from_file.rs | 4 +- src/execution/executor/dml/delete.rs | 17 +++---- src/execution/executor/dml/insert.rs | 30 ++++++------ src/execution/executor/dml/update.rs | 38 ++++++++++----- .../executor/dql/aggregate/hash_agg.rs | 29 ++++++----- .../executor/dql/aggregate/simple_agg.rs | 17 ++++--- src/execution/executor/dql/dummy.rs | 2 +- src/execution/executor/dql/filter.rs | 17 +++---- src/execution/executor/dql/index_scan.rs | 2 +- src/execution/executor/dql/join/hash_join.rs | 48 +++++++++++++------ src/execution/executor/dql/limit.rs | 20 +++++--- src/execution/executor/dql/projection.rs | 20 ++++---- src/execution/executor/dql/seq_scan.rs | 2 +- src/execution/executor/dql/sort.rs | 25 ++++++---- src/execution/executor/dql/values.rs | 2 +- src/execution/executor/mod.rs | 40 ++++++++-------- src/execution/executor/show/show_table.rs | 2 +- 20 files changed, 191 insertions(+), 130 deletions(-) diff --git a/src/execution/executor/ddl/create_table.rs b/src/execution/executor/ddl/create_table.rs index f395bbe7..6bfd8613 100644 --- a/src/execution/executor/ddl/create_table.rs +++ b/src/execution/executor/ddl/create_table.rs @@ -18,7 +18,7 @@ impl From for CreateTable { } impl Executor for CreateTable { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } diff --git a/src/execution/executor/ddl/drop_table.rs b/src/execution/executor/ddl/drop_table.rs index 0136153f..1b8c8cde 100644 --- a/src/execution/executor/ddl/drop_table.rs +++ b/src/execution/executor/ddl/drop_table.rs @@ -17,7 +17,7 @@ impl From for DropTable { } impl Executor for DropTable { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } diff --git a/src/execution/executor/ddl/truncate.rs b/src/execution/executor/ddl/truncate.rs index 150bb45b..661959cd 100644 --- a/src/execution/executor/ddl/truncate.rs +++ b/src/execution/executor/ddl/truncate.rs @@ -17,7 +17,7 @@ impl From for Truncate { } impl Executor for Truncate { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } diff --git a/src/execution/executor/dml/copy_from_file.rs b/src/execution/executor/dml/copy_from_file.rs index 89e460f2..a85c401b 100644 --- a/src/execution/executor/dml/copy_from_file.rs +++ b/src/execution/executor/dml/copy_from_file.rs @@ -23,7 +23,7 @@ impl From for CopyFromFile { } impl Executor for CopyFromFile { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } @@ -195,7 +195,7 @@ mod tests { let storage = db.storage; let transaction = RefCell::new(storage.transaction().await?); let actual = executor - .execute(vec![], &transaction) + .execute(&transaction) .next() .await .unwrap()?; diff --git a/src/execution/executor/dml/delete.rs b/src/execution/executor/dml/delete.rs index caa219e9..d85c2549 100644 --- a/src/execution/executor/dml/delete.rs +++ b/src/execution/executor/dml/delete.rs @@ -11,24 +11,25 @@ use std::cell::RefCell; pub struct Delete { table_name: TableName, + input: BoxedExecutor, } -impl From for Delete { - fn from(DeleteOperator { table_name }: DeleteOperator) -> Delete { - Delete { table_name } +impl From<(DeleteOperator, BoxedExecutor)> for Delete { + fn from((DeleteOperator { table_name }, input): (DeleteOperator, BoxedExecutor)) -> Self { + Delete { table_name, input } } } impl Executor for Delete { - fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor { - unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) } + fn execute(self, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl Delete { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - async fn _execute(self, transaction: &mut T, mut inputs: Vec) { - let Delete { table_name } = self; + async fn _execute(self, transaction: &mut T) { + let Delete { table_name, input } = self; let option_index_metas = transaction.table(&table_name).map(|table_catalog| { table_catalog .all_columns() @@ -51,7 +52,7 @@ impl Delete { if let Some(index_metas) = option_index_metas { #[for_await] - for tuple in inputs.remove(0) { + for tuple in input { let tuple: Tuple = tuple?; for (i, index_meta) in index_metas.iter() { diff --git a/src/execution/executor/dml/insert.rs b/src/execution/executor/dml/insert.rs index ac7ad57c..a4f5e1a0 100644 --- a/src/execution/executor/dml/insert.rs +++ b/src/execution/executor/dml/insert.rs @@ -13,38 +13,40 @@ use std::sync::Arc; pub struct Insert { table_name: TableName, + input: BoxedExecutor, is_overwrite: bool, } -impl From for Insert { +impl From<(InsertOperator, BoxedExecutor)> for Insert { fn from( - InsertOperator { - table_name, - is_overwrite, - }: InsertOperator, - ) -> Insert { + ( + InsertOperator { + table_name, + is_overwrite, + }, + input, + ): (InsertOperator, BoxedExecutor), + ) -> Self { Insert { table_name, + input, is_overwrite, } } } impl Executor for Insert { - fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor { - unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) } + fn execute(self, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl Insert { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute( - self, - transaction: &mut T, - mut inputs: Vec, - ) { + pub async fn _execute(self, transaction: &mut T) { let Insert { table_name, + input, is_overwrite, } = self; let mut primary_key_index = None; @@ -52,7 +54,7 @@ impl Insert { if let Some(table_catalog) = transaction.table(&table_name).cloned() { #[for_await] - for tuple in inputs.remove(0) { + for tuple in input { let Tuple { columns, values, .. } = tuple?; diff --git a/src/execution/executor/dml/update.rs b/src/execution/executor/dml/update.rs index 6c6d12ab..6dd1055a 100644 --- a/src/execution/executor/dml/update.rs +++ b/src/execution/executor/dml/update.rs @@ -11,35 +11,47 @@ use std::collections::HashMap; pub struct Update { table_name: TableName, + input: BoxedExecutor, + values: BoxedExecutor, } -impl From for Update { - fn from(UpdateOperator { table_name }: UpdateOperator) -> Update { - Update { table_name } +impl From<(UpdateOperator, BoxedExecutor, BoxedExecutor)> for Update { + fn from( + (UpdateOperator { table_name }, input, values): ( + UpdateOperator, + BoxedExecutor, + BoxedExecutor, + ), + ) -> Self { + Update { + table_name, + input, + values, + } } } impl Executor for Update { - fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor { - unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) } + fn execute(self, transaction: &RefCell) -> BoxedExecutor { + unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) } } } impl Update { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute( - self, - transaction: &mut T, - mut inputs: Vec, - ) { - let Update { table_name } = self; + pub async fn _execute(self, transaction: &mut T, ) { + let Update { + table_name, + input, + values, + } = self; if let Some(table_catalog) = transaction.table(&table_name).cloned() { let mut value_map = HashMap::new(); // only once #[for_await] - for tuple in inputs.remove(1) { + for tuple in values { let Tuple { columns, values, .. } = tuple?; @@ -48,7 +60,7 @@ impl Update { } } #[for_await] - for tuple in inputs.remove(0) { + for tuple in input { let mut tuple: Tuple = tuple?; let mut is_overwrite = true; diff --git a/src/execution/executor/dql/aggregate/hash_agg.rs b/src/execution/executor/dql/aggregate/hash_agg.rs index 3d9ea1d7..968d2c05 100644 --- a/src/execution/executor/dql/aggregate/hash_agg.rs +++ b/src/execution/executor/dql/aggregate/hash_agg.rs @@ -14,36 +14,41 @@ use std::cell::RefCell; pub struct HashAggExecutor { pub agg_calls: Vec, pub groupby_exprs: Vec, + pub input: BoxedExecutor, } -impl From for HashAggExecutor { +impl From<(AggregateOperator, BoxedExecutor)> for HashAggExecutor { fn from( - AggregateOperator { - agg_calls, - groupby_exprs, - }: AggregateOperator, - ) -> HashAggExecutor { + ( + AggregateOperator { + agg_calls, + groupby_exprs, + }, + input, + ): (AggregateOperator, BoxedExecutor), + ) -> Self { HashAggExecutor { agg_calls, groupby_exprs, + input, } } } impl Executor for HashAggExecutor { - fn execute<'a>(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute<'a>(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl HashAggExecutor { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { + pub async fn _execute(self) { let mut group_and_agg_columns_option = None; let mut group_hash_accs = HashMap::new(); #[for_await] - for tuple in inputs.remove(0) { + for tuple in self.input { let tuple = tuple?; // 1. build group and agg columns for hash_agg columns. @@ -189,10 +194,10 @@ mod test { ], columns: t1_columns, }) - .execute(vec![], &transaction); + .execute(&transaction); let tuples = - try_collect(&mut HashAggExecutor::from(operator).execute(vec![input], &transaction)) + try_collect(&mut HashAggExecutor::from((operator, input)).execute(&transaction)) .await?; println!("hash_agg_test: \n{}", create_table(&tuples)); diff --git a/src/execution/executor/dql/aggregate/simple_agg.rs b/src/execution/executor/dql/aggregate/simple_agg.rs index 60499c84..b1d39ee3 100644 --- a/src/execution/executor/dql/aggregate/simple_agg.rs +++ b/src/execution/executor/dql/aggregate/simple_agg.rs @@ -12,28 +12,31 @@ use std::cell::RefCell; pub struct SimpleAggExecutor { pub agg_calls: Vec, + pub input: BoxedExecutor, } -impl From for SimpleAggExecutor { - fn from(AggregateOperator { agg_calls, .. }: AggregateOperator) -> SimpleAggExecutor { - SimpleAggExecutor { agg_calls } +impl From<(AggregateOperator, BoxedExecutor)> for SimpleAggExecutor { + fn from( + (AggregateOperator { agg_calls, .. }, input): (AggregateOperator, BoxedExecutor), + ) -> Self { + SimpleAggExecutor { agg_calls, input } } } impl Executor for SimpleAggExecutor { - fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl SimpleAggExecutor { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { + pub async fn _execute(self) { let mut accs = create_accumulators(&self.agg_calls); let mut columns_option = None; #[for_await] - for tuple in inputs.remove(0) { + for tuple in self.input { let tuple = tuple?; columns_option.get_or_insert_with(|| { diff --git a/src/execution/executor/dql/dummy.rs b/src/execution/executor/dql/dummy.rs index 54ff607f..5e8e756f 100644 --- a/src/execution/executor/dql/dummy.rs +++ b/src/execution/executor/dql/dummy.rs @@ -8,7 +8,7 @@ use std::cell::RefCell; pub struct Dummy {} impl Executor for Dummy { - fn execute<'a>(self, _inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + fn execute<'a>(self, _transaction: &RefCell) -> BoxedExecutor { self._execute() } } diff --git a/src/execution/executor/dql/filter.rs b/src/execution/executor/dql/filter.rs index 30292dd9..53ff1d09 100644 --- a/src/execution/executor/dql/filter.rs +++ b/src/execution/executor/dql/filter.rs @@ -10,27 +10,28 @@ use std::cell::RefCell; pub struct Filter { predicate: ScalarExpression, + input: BoxedExecutor, } -impl From for Filter { - fn from(FilterOperator { predicate, .. }: FilterOperator) -> Filter { - Filter { predicate } +impl From<(FilterOperator, BoxedExecutor)> for Filter { + fn from((FilterOperator { predicate, .. }, input): (FilterOperator, BoxedExecutor)) -> Self { + Filter { predicate, input } } } impl Executor for Filter { - fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl Filter { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { - let Filter { predicate } = self; + pub async fn _execute(self) { + let Filter { predicate, input } = self; #[for_await] - for tuple in inputs.remove(0) { + for tuple in input { let tuple = tuple?; if let DataValue::Boolean(option) = predicate.eval_column(&tuple)?.as_ref() { if let Some(true) = option { diff --git a/src/execution/executor/dql/index_scan.rs b/src/execution/executor/dql/index_scan.rs index de04d4af..97d00963 100644 --- a/src/execution/executor/dql/index_scan.rs +++ b/src/execution/executor/dql/index_scan.rs @@ -18,7 +18,7 @@ impl From for IndexScan { } impl Executor for IndexScan { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) } } } diff --git a/src/execution/executor/dql/join/hash_join.rs b/src/execution/executor/dql/join/hash_join.rs index 41d8ac39..1fdc75bb 100644 --- a/src/execution/executor/dql/join/hash_join.rs +++ b/src/execution/executor/dql/join/hash_join.rs @@ -17,24 +17,42 @@ use std::sync::Arc; pub struct HashJoin { on: JoinCondition, ty: JoinType, + left_input: BoxedExecutor, + right_input: BoxedExecutor, } -impl From for HashJoin { - fn from(JoinOperator { on, join_type }: JoinOperator) -> HashJoin { - HashJoin { on, ty: join_type } +impl From<(JoinOperator, BoxedExecutor, BoxedExecutor)> for HashJoin { + fn from( + (JoinOperator { on, join_type }, left_input, right_input): ( + JoinOperator, + BoxedExecutor, + BoxedExecutor, + ), + ) -> Self { + HashJoin { + on, + ty: join_type, + left_input, + right_input, + } } } impl Executor for HashJoin { - fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl HashJoin { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { - let HashJoin { on, ty } = self; + pub async fn _execute(self) { + let HashJoin { + on, + ty, + left_input, + right_input, + } = self; if ty == JoinType::Cross { unreachable!("Cross join should not be in HashJoinExecutor"); @@ -59,7 +77,7 @@ impl HashJoin { // 2.merged all left tuples. let mut left_init_flag = false; #[for_await] - for tuple in inputs.remove(0) { + for tuple in left_input { let tuple: Tuple = tuple?; let hash = Self::hash_row(&on_left_keys, &hash_random_state, &tuple)?; @@ -74,7 +92,7 @@ impl HashJoin { // probe phase let mut right_init_flag = false; #[for_await] - for tuple in inputs.remove(0) { + for tuple in right_input { let tuple: Tuple = tuple?; let right_cols_len = tuple.columns.len(); let hash = Self::hash_row(&on_right_keys, &hash_random_state, &tuple)?; @@ -353,8 +371,8 @@ mod test { ( on_keys, - values_t1.execute(vec![], &_t), - values_t2.execute(vec![], &_t), + values_t1.execute(&_t), + values_t2.execute(&_t), ) } @@ -372,7 +390,7 @@ mod test { }, join_type: JoinType::Inner, }; - let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); + let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; println!("inner_test: \n{}", create_table(&tuples)); @@ -409,7 +427,7 @@ mod test { }, join_type: JoinType::Left, }; - let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); + let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; println!("left_test: \n{}", create_table(&tuples)); @@ -450,7 +468,7 @@ mod test { }, join_type: JoinType::Right, }; - let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); + let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; println!("right_test: \n{}", create_table(&tuples)); @@ -491,7 +509,7 @@ mod test { }, join_type: JoinType::Full, }; - let mut executor = HashJoin::from(op).execute(vec![left, right], &transaction); + let mut executor = HashJoin::from((op, left, right)).execute(&transaction); let tuples = try_collect(&mut executor).await?; println!("full_test: \n{}", create_table(&tuples)); diff --git a/src/execution/executor/dql/limit.rs b/src/execution/executor/dql/limit.rs index a011af7d..b4c961b1 100644 --- a/src/execution/executor/dql/limit.rs +++ b/src/execution/executor/dql/limit.rs @@ -10,27 +10,33 @@ use std::cell::RefCell; pub struct Limit { offset: Option, limit: Option, + input: BoxedExecutor, } -impl From for Limit { - fn from(LimitOperator { offset, limit }: LimitOperator) -> Limit { +impl From<(LimitOperator, BoxedExecutor)> for Limit { + fn from((LimitOperator { offset, limit }, input): (LimitOperator, BoxedExecutor)) -> Self { Limit { offset: Some(offset), limit: Some(limit), + input, } } } impl Executor for Limit { - fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl Limit { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { - let Limit { offset, limit } = self; + pub async fn _execute(self) { + let Limit { + offset, + limit, + input, + } = self; if limit.is_some() && limit.unwrap() == 0 { return Ok(()); @@ -40,7 +46,7 @@ impl Limit { let offset_limit = offset_val + limit.unwrap_or(1) - 1; #[for_await] - for (i, tuple) in inputs.remove(0).enumerate() { + for (i, tuple) in input.enumerate() { if i < offset_val { continue; } else if i > offset_limit { diff --git a/src/execution/executor/dql/projection.rs b/src/execution/executor/dql/projection.rs index cf27d6eb..8285896a 100644 --- a/src/execution/executor/dql/projection.rs +++ b/src/execution/executor/dql/projection.rs @@ -9,27 +9,31 @@ use std::cell::RefCell; pub struct Projection { exprs: Vec, + input: BoxedExecutor, } -impl From for Projection { - fn from(ProjectOperator { columns }: ProjectOperator) -> Projection { - Projection { exprs: columns } +impl From<(ProjectOperator, BoxedExecutor)> for Projection { + fn from((ProjectOperator { columns }, input): (ProjectOperator, BoxedExecutor)) -> Self { + Projection { + exprs: columns, + input, + } } } impl Executor for Projection { - fn execute<'a>(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute<'a>(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl Projection { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { - let Projection { exprs } = self; + pub async fn _execute(self) { + let Projection { exprs, input } = self; #[for_await] - for tuple in inputs.remove(0) { + for tuple in input { let tuple = tuple?; let mut columns = Vec::with_capacity(exprs.len()); diff --git a/src/execution/executor/dql/seq_scan.rs b/src/execution/executor/dql/seq_scan.rs index 7cf15722..00684d6b 100644 --- a/src/execution/executor/dql/seq_scan.rs +++ b/src/execution/executor/dql/seq_scan.rs @@ -17,7 +17,7 @@ impl From for SeqScan { } impl Executor for SeqScan { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) } } } diff --git a/src/execution/executor/dql/sort.rs b/src/execution/executor/dql/sort.rs index 37e9a84c..e1d25fd2 100644 --- a/src/execution/executor/dql/sort.rs +++ b/src/execution/executor/dql/sort.rs @@ -10,28 +10,37 @@ use std::cmp::Ordering; pub struct Sort { sort_fields: Vec, limit: Option, + input: BoxedExecutor, } -impl From for Sort { - fn from(SortOperator { sort_fields, limit }: SortOperator) -> Sort { - Sort { sort_fields, limit } +impl From<(SortOperator, BoxedExecutor)> for Sort { + fn from((SortOperator { sort_fields, limit }, input): (SortOperator, BoxedExecutor)) -> Self { + Sort { + sort_fields, + limit, + input, + } } } impl Executor for Sort { - fn execute(self, inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { - self._execute(inputs) + fn execute(self, _transaction: &RefCell) -> BoxedExecutor { + self._execute() } } impl Sort { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] - pub async fn _execute(self, mut inputs: Vec) { - let Sort { sort_fields, limit } = self; + pub async fn _execute(self) { + let Sort { + sort_fields, + limit, + input, + } = self; let mut tuples: Vec = vec![]; #[for_await] - for tuple in inputs.remove(0) { + for tuple in input { tuples.push(tuple?); } diff --git a/src/execution/executor/dql/values.rs b/src/execution/executor/dql/values.rs index a294533c..80fae528 100644 --- a/src/execution/executor/dql/values.rs +++ b/src/execution/executor/dql/values.rs @@ -17,7 +17,7 @@ impl From for Values { } impl Executor for Values { - fn execute(self, _inputs: Vec, _transaction: &RefCell) -> BoxedExecutor { + fn execute(self, _transaction: &RefCell) -> BoxedExecutor { self._execute() } } diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs index e8d53e66..fdc22b65 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/executor/mod.rs @@ -34,7 +34,7 @@ use std::cell::RefCell; pub type BoxedExecutor = BoxStream<'static, Result>; pub trait Executor { - fn execute(self, inputs: Vec, transaction: &RefCell) -> BoxedExecutor; + fn execute(self, transaction: &RefCell) -> BoxedExecutor; } pub fn build(plan: LogicalPlan, transaction: &RefCell) -> BoxedExecutor { @@ -44,71 +44,71 @@ pub fn build(plan: LogicalPlan, transaction: &RefCell) -> Box } = plan; match operator { - Operator::Dummy => Dummy {}.execute(vec![], transaction), + Operator::Dummy => Dummy {}.execute(transaction), Operator::Aggregate(op) => { let input = build(childrens.remove(0), transaction); if op.groupby_exprs.is_empty() { - SimpleAggExecutor::from(op).execute(vec![input], transaction) + SimpleAggExecutor::from((op, input)).execute(transaction) } else { - HashAggExecutor::from(op).execute(vec![input], transaction) + HashAggExecutor::from((op, input)).execute(transaction) } } Operator::Filter(op) => { let input = build(childrens.remove(0), transaction); - Filter::from(op).execute(vec![input], transaction) + Filter::from((op, input)).execute(transaction) } Operator::Join(op) => { let left_input = build(childrens.remove(0), transaction); let right_input = build(childrens.remove(0), transaction); - HashJoin::from(op).execute(vec![left_input, right_input], transaction) + HashJoin::from((op, left_input, right_input)).execute(transaction) } Operator::Project(op) => { let input = build(childrens.remove(0), transaction); - Projection::from(op).execute(vec![input], transaction) + Projection::from((op, input)).execute(transaction) } Operator::Scan(op) => { if op.index_by.is_some() { - IndexScan::from(op).execute(vec![], transaction) + IndexScan::from(op).execute(transaction) } else { - SeqScan::from(op).execute(vec![], transaction) + SeqScan::from(op).execute(transaction) } } Operator::Sort(op) => { let input = build(childrens.remove(0), transaction); - Sort::from(op).execute(vec![input], transaction) + Sort::from((op, input)).execute(transaction) } Operator::Limit(op) => { let input = build(childrens.remove(0), transaction); - Limit::from(op).execute(vec![input], transaction) + Limit::from((op, input)).execute(transaction) } Operator::Insert(op) => { let input = build(childrens.remove(0), transaction); - Insert::from(op).execute(vec![input], transaction) + Insert::from((op, input)).execute(transaction) } Operator::Update(op) => { let input = build(childrens.remove(0), transaction); let values = build(childrens.remove(0), transaction); - Update::from(op).execute(vec![input, values], transaction) + Update::from((op, input, values)).execute(transaction) } Operator::Delete(op) => { let input = build(childrens.remove(0), transaction); - Delete::from(op).execute(vec![input], transaction) + Delete::from((op, input)).execute(transaction) } - Operator::Values(op) => Values::from(op).execute(vec![], transaction), - Operator::CreateTable(op) => CreateTable::from(op).execute(vec![], transaction), - Operator::DropTable(op) => DropTable::from(op).execute(vec![], transaction), - Operator::Truncate(op) => Truncate::from(op).execute(vec![], transaction), - Operator::Show(op) => ShowTables::from(op).execute(vec![], transaction), - Operator::CopyFromFile(op) => CopyFromFile::from(op).execute(vec![], transaction), + Operator::Values(op) => Values::from(op).execute(transaction), + Operator::CreateTable(op) => CreateTable::from(op).execute(transaction), + Operator::DropTable(op) => DropTable::from(op).execute(transaction), + Operator::Truncate(op) => Truncate::from(op).execute(transaction), + Operator::Show(op) => ShowTables::from(op).execute(transaction), + Operator::CopyFromFile(op) => CopyFromFile::from(op).execute(transaction), #[warn(unused_assignments)] Operator::CopyToFile(_op) => { todo!() diff --git a/src/execution/executor/show/show_table.rs b/src/execution/executor/show/show_table.rs index fcc8d7cf..0e463428 100644 --- a/src/execution/executor/show/show_table.rs +++ b/src/execution/executor/show/show_table.rs @@ -21,7 +21,7 @@ impl From for ShowTables { } impl Executor for ShowTables { - fn execute(self, _inputs: Vec, transaction: &RefCell) -> BoxedExecutor { + fn execute(self, transaction: &RefCell) -> BoxedExecutor { unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) } } }