Skip to content

Commit

Permalink
perf: simplification of HashJoin and HashAgg
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 7, 2024
1 parent 96cdeca commit cfab5b4
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 437 deletions.
135 changes: 41 additions & 94 deletions src/execution/dql/aggregate/hash_agg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::catalog::ColumnRef;
use crate::errors::DatabaseError;
use crate::execution::dql::aggregate::{create_accumulators, Accumulator};
use crate::execution::{build_read, Executor, ReadExecutor};
Expand All @@ -7,10 +6,11 @@ use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::LogicalPlan;
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::throw;
use crate::types::tuple::{SchemaRef, Tuple};
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use ahash::HashMap;
use ahash::{HashMap, HashMapExt};
use itertools::Itertools;
use std::collections::hash_map::Entry;
use std::ops::{Coroutine, CoroutineState};
use std::pin::Pin;

Expand Down Expand Up @@ -54,109 +54,56 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor {
mut input,
} = self;

let mut agg_status =
HashAggStatus::new(input.output_schema().clone(), agg_calls, groupby_exprs);
let schema_ref = input.output_schema().clone();
let mut group_hash_accs: HashMap<Vec<DataValue>, Vec<Box<dyn Accumulator>>> =
HashMap::new();

let mut coroutine = build_read(input, cache, transaction);

while let CoroutineState::Yielded(result) = Pin::new(&mut coroutine).resume(()) {
throw!(agg_status.update(throw!(result)));
let tuple = throw!(result);
let mut values = Vec::with_capacity(agg_calls.len());

for expr in agg_calls.iter() {
if let ScalarExpression::AggCall { args, .. } = expr {
if args.len() > 1 {
throw!(Err(DatabaseError::UnsupportedStmt("currently aggregate functions only support a single Column as a parameter".to_string())))
}
values.push(throw!(args[0].eval(&tuple, &schema_ref)));
} else {
unreachable!()
}
}
let group_keys: Vec<DataValue> = throw!(groupby_exprs
.iter()
.map(|expr| expr.eval(&tuple, &schema_ref))
.try_collect());

let entry = match group_hash_accs.entry(group_keys) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
entry.insert(throw!(create_accumulators(&agg_calls)))
}
};
for (acc, value) in entry.iter_mut().zip_eq(values.iter()) {
throw!(acc.update_value(value));
}
}

for tuple in throw!(agg_status.as_tuples()) {
yield Ok(tuple);
for (group_keys, accs) in group_hash_accs {
// Tips: Accumulator First
let values: Vec<DataValue> = throw!(accs
.iter()
.map(|acc| acc.evaluate())
.chain(group_keys.into_iter().map(Ok))
.try_collect());
yield Ok(Tuple { id: None, values });
}
},
)
}
}

pub(crate) struct HashAggStatus {
schema_ref: SchemaRef,

agg_calls: Vec<ScalarExpression>,
groupby_exprs: Vec<ScalarExpression>,

group_columns: Vec<ColumnRef>,
group_hash_accs: HashMap<Vec<DataValue>, Vec<Box<dyn Accumulator>>>,
}

impl HashAggStatus {
pub(crate) fn new(
schema_ref: SchemaRef,
agg_calls: Vec<ScalarExpression>,
groupby_exprs: Vec<ScalarExpression>,
) -> Self {
HashAggStatus {
schema_ref,
agg_calls,
groupby_exprs,
group_columns: vec![],
group_hash_accs: Default::default(),
}
}

pub(crate) fn update(&mut self, tuple: Tuple) -> Result<(), DatabaseError> {
// 1. build group and agg columns for hash_agg columns.
// Tips: AggCall First
if self.group_columns.is_empty() {
self.group_columns = self
.agg_calls
.iter()
.chain(self.groupby_exprs.iter())
.map(|expr| expr.output_column())
.collect_vec();
}

// 2.1 evaluate agg exprs and collect the result values for later accumulators.
let values: Vec<DataValue> = self
.agg_calls
.iter()
.map(|expr| {
if let ScalarExpression::AggCall { args, .. } = expr {
args[0].eval(&tuple, &self.schema_ref)
} else {
unreachable!()
}
})
.try_collect()?;

let group_keys: Vec<DataValue> = self
.groupby_exprs
.iter()
.map(|expr| expr.eval(&tuple, &self.schema_ref))
.try_collect()?;

for (acc, value) in self
.group_hash_accs
.entry(group_keys)
.or_insert_with(|| create_accumulators(&self.agg_calls).unwrap())
.iter_mut()
.zip_eq(values.iter())
{
acc.update_value(value)?;
}

Ok(())
}

pub(crate) fn as_tuples(&mut self) -> Result<Vec<Tuple>, DatabaseError> {
self.group_hash_accs
.drain()
.map(|(group_keys, accs)| {
// Tips: Accumulator First
let values: Vec<DataValue> = accs
.iter()
.map(|acc| acc.evaluate())
.chain(group_keys.into_iter().map(Ok))
.try_collect()?;

Ok::<Tuple, DatabaseError>(Tuple { id: None, values })
})
.try_collect()
}
}

#[cfg(test)]
mod test {
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef};
Expand Down
Loading

0 comments on commit cfab5b4

Please sign in to comment.