Skip to content

Commit

Permalink
style: inputs restore
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Oct 7, 2023
1 parent 2854fde commit dd1268f
Show file tree
Hide file tree
Showing 20 changed files with 191 additions and 130 deletions.
2 changes: 1 addition & 1 deletion src/execution/executor/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl From<CreateTableOperator> for CreateTable {
}

impl<T: Transaction> Executor<T> for CreateTable {
fn execute(self, _inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/execution/executor/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl From<DropTableOperator> for DropTable {
}

impl<T: Transaction> Executor<T> for DropTable {
fn execute(self, _inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/execution/executor/ddl/truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl From<TruncateOperator> for Truncate {
}

impl<T: Transaction> Executor<T> for Truncate {
fn execute(self, _inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/execution/executor/dml/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl From<CopyFromFileOperator> for CopyFromFile {
}

impl<T: Transaction> Executor<T> for CopyFromFile {
fn execute(self, _inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ mod tests {
let storage = db.storage;
let transaction = RefCell::new(storage.transaction().await?);
let actual = executor
.execute(vec![], &transaction)
.execute(&transaction)
.next()
.await
.unwrap()?;
Expand Down
17 changes: 9 additions & 8 deletions src/execution/executor/dml/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,25 @@ use std::cell::RefCell;

pub struct Delete {
table_name: TableName,
input: BoxedExecutor,
}

impl From<DeleteOperator> for Delete {
fn from(DeleteOperator { table_name }: DeleteOperator) -> Delete {
Delete { table_name }
impl From<(DeleteOperator, BoxedExecutor)> for Delete {
fn from((DeleteOperator { table_name }, input): (DeleteOperator, BoxedExecutor)) -> Self {
Delete { table_name, input }
}
}

impl<T: Transaction> Executor<T> for Delete {
fn execute(self, inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) }
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}

impl Delete {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
async fn _execute<T: Transaction>(self, transaction: &mut T, mut inputs: Vec<BoxedExecutor>) {
let Delete { table_name } = self;
async fn _execute<T: Transaction>(self, transaction: &mut T) {
let Delete { table_name, input } = self;
let option_index_metas = transaction.table(&table_name).map(|table_catalog| {
table_catalog
.all_columns()
Expand All @@ -51,7 +52,7 @@ impl Delete {

if let Some(index_metas) = option_index_metas {
#[for_await]
for tuple in inputs.remove(0) {
for tuple in input {
let tuple: Tuple = tuple?;

for (i, index_meta) in index_metas.iter() {
Expand Down
30 changes: 16 additions & 14 deletions src/execution/executor/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,48 @@ use std::sync::Arc;

pub struct Insert {
table_name: TableName,
input: BoxedExecutor,
is_overwrite: bool,
}

impl From<InsertOperator> for Insert {
impl From<(InsertOperator, BoxedExecutor)> for Insert {
fn from(
InsertOperator {
table_name,
is_overwrite,
}: InsertOperator,
) -> Insert {
(
InsertOperator {
table_name,
is_overwrite,
},
input,
): (InsertOperator, BoxedExecutor),
) -> Self {
Insert {
table_name,
input,
is_overwrite,
}
}
}

impl<T: Transaction> Executor<T> for Insert {
fn execute(self, inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) }
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}

impl Insert {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute<T: Transaction>(
self,
transaction: &mut T,
mut inputs: Vec<BoxedExecutor>,
) {
pub async fn _execute<T: Transaction>(self, transaction: &mut T) {
let Insert {
table_name,
input,
is_overwrite,
} = self;
let mut primary_key_index = None;
let mut unique_values = HashMap::new();

if let Some(table_catalog) = transaction.table(&table_name).cloned() {
#[for_await]
for tuple in inputs.remove(0) {
for tuple in input {
let Tuple {
columns, values, ..
} = tuple?;
Expand Down
38 changes: 25 additions & 13 deletions src/execution/executor/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,47 @@ use std::collections::HashMap;

pub struct Update {
table_name: TableName,
input: BoxedExecutor,
values: BoxedExecutor,
}

impl From<UpdateOperator> for Update {
fn from(UpdateOperator { table_name }: UpdateOperator) -> Update {
Update { table_name }
impl From<(UpdateOperator, BoxedExecutor, BoxedExecutor)> for Update {
fn from(
(UpdateOperator { table_name }, input, values): (
UpdateOperator,
BoxedExecutor,
BoxedExecutor,
),
) -> Self {
Update {
table_name,
input,
values,
}
}
}

impl<T: Transaction> Executor<T> for Update {
fn execute(self, inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap(), inputs) }
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_mut().unwrap()) }
}
}

impl Update {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute<T: Transaction>(
self,
transaction: &mut T,
mut inputs: Vec<BoxedExecutor>,
) {
let Update { table_name } = self;
pub async fn _execute<T: Transaction>(self, transaction: &mut T, ) {
let Update {
table_name,
input,
values,
} = self;

if let Some(table_catalog) = transaction.table(&table_name).cloned() {
let mut value_map = HashMap::new();

// only once
#[for_await]
for tuple in inputs.remove(1) {
for tuple in values {
let Tuple {
columns, values, ..
} = tuple?;
Expand All @@ -48,7 +60,7 @@ impl Update {
}
}
#[for_await]
for tuple in inputs.remove(0) {
for tuple in input {
let mut tuple: Tuple = tuple?;
let mut is_overwrite = true;

Expand Down
29 changes: 17 additions & 12 deletions src/execution/executor/dql/aggregate/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,41 @@ use std::cell::RefCell;
pub struct HashAggExecutor {
pub agg_calls: Vec<ScalarExpression>,
pub groupby_exprs: Vec<ScalarExpression>,
pub input: BoxedExecutor,
}

impl From<AggregateOperator> for HashAggExecutor {
impl From<(AggregateOperator, BoxedExecutor)> for HashAggExecutor {
fn from(
AggregateOperator {
agg_calls,
groupby_exprs,
}: AggregateOperator,
) -> HashAggExecutor {
(
AggregateOperator {
agg_calls,
groupby_exprs,
},
input,
): (AggregateOperator, BoxedExecutor),
) -> Self {
HashAggExecutor {
agg_calls,
groupby_exprs,
input,
}
}
}

impl<T: Transaction> Executor<T> for HashAggExecutor {
fn execute<'a>(self, inputs: Vec<BoxedExecutor>, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute(inputs)
fn execute<'a>(self, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute()
}
}

impl HashAggExecutor {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute(self, mut inputs: Vec<BoxedExecutor>) {
pub async fn _execute(self) {
let mut group_and_agg_columns_option = None;
let mut group_hash_accs = HashMap::new();

#[for_await]
for tuple in inputs.remove(0) {
for tuple in self.input {
let tuple = tuple?;

// 1. build group and agg columns for hash_agg columns.
Expand Down Expand Up @@ -189,10 +194,10 @@ mod test {
],
columns: t1_columns,
})
.execute(vec![], &transaction);
.execute(&transaction);

let tuples =
try_collect(&mut HashAggExecutor::from(operator).execute(vec![input], &transaction))
try_collect(&mut HashAggExecutor::from((operator, input)).execute(&transaction))
.await?;

println!("hash_agg_test: \n{}", create_table(&tuples));
Expand Down
17 changes: 10 additions & 7 deletions src/execution/executor/dql/aggregate/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,31 @@ use std::cell::RefCell;

pub struct SimpleAggExecutor {
pub agg_calls: Vec<ScalarExpression>,
pub input: BoxedExecutor,
}

impl From<AggregateOperator> for SimpleAggExecutor {
fn from(AggregateOperator { agg_calls, .. }: AggregateOperator) -> SimpleAggExecutor {
SimpleAggExecutor { agg_calls }
impl From<(AggregateOperator, BoxedExecutor)> for SimpleAggExecutor {
fn from(
(AggregateOperator { agg_calls, .. }, input): (AggregateOperator, BoxedExecutor),
) -> Self {
SimpleAggExecutor { agg_calls, input }
}
}

impl<T: Transaction> Executor<T> for SimpleAggExecutor {
fn execute(self, inputs: Vec<BoxedExecutor>, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute(inputs)
fn execute(self, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute()
}
}

impl SimpleAggExecutor {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute(self, mut inputs: Vec<BoxedExecutor>) {
pub async fn _execute(self) {
let mut accs = create_accumulators(&self.agg_calls);
let mut columns_option = None;

#[for_await]
for tuple in inputs.remove(0) {
for tuple in self.input {
let tuple = tuple?;

columns_option.get_or_insert_with(|| {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/executor/dql/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::cell::RefCell;
pub struct Dummy {}

impl<T: Transaction> Executor<T> for Dummy {
fn execute<'a>(self, _inputs: Vec<BoxedExecutor>, _transaction: &RefCell<T>) -> BoxedExecutor {
fn execute<'a>(self, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute()
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/execution/executor/dql/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,28 @@ use std::cell::RefCell;

pub struct Filter {
predicate: ScalarExpression,
input: BoxedExecutor,
}

impl From<FilterOperator> for Filter {
fn from(FilterOperator { predicate, .. }: FilterOperator) -> Filter {
Filter { predicate }
impl From<(FilterOperator, BoxedExecutor)> for Filter {
fn from((FilterOperator { predicate, .. }, input): (FilterOperator, BoxedExecutor)) -> Self {
Filter { predicate, input }
}
}

impl<T: Transaction> Executor<T> for Filter {
fn execute(self, inputs: Vec<BoxedExecutor>, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute(inputs)
fn execute(self, _transaction: &RefCell<T>) -> BoxedExecutor {
self._execute()
}
}

impl Filter {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute(self, mut inputs: Vec<BoxedExecutor>) {
let Filter { predicate } = self;
pub async fn _execute(self) {
let Filter { predicate, input } = self;

#[for_await]
for tuple in inputs.remove(0) {
for tuple in input {
let tuple = tuple?;
if let DataValue::Boolean(option) = predicate.eval_column(&tuple)?.as_ref() {
if let Some(true) = option {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/executor/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl From<ScanOperator> for IndexScan {
}

impl<T: Transaction> Executor<T> for IndexScan {
fn execute(self, _inputs: Vec<BoxedExecutor>, transaction: &RefCell<T>) -> BoxedExecutor {
fn execute(self, transaction: &RefCell<T>) -> BoxedExecutor {
unsafe { self._execute(transaction.as_ptr().as_ref().unwrap()) }
}
}
Expand Down
Loading

0 comments on commit dd1268f

Please sign in to comment.