Skip to content

Commit

Permalink
perf(over window): incremental aggregation (new) (#13038)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 31, 2023
1 parent 9616cbf commit f18e73e
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 182 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions src/batch/src/executor/sort_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ impl SortOverWindowExecutor {
}
}
for row in rows.drain(..) {
if let Some(chunk) =
chunk_builder.append_one_row(row.chain(OwnedRow::new(states.curr_output()?)))
if let Some(chunk) = chunk_builder
.append_one_row(row.chain(OwnedRow::new(states.slide_no_evict_hint()?)))
{
yield chunk;
}
states.just_slide_forward();
}
}
}
1 change: 1 addition & 0 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ ctor = "0.2"
downcast-rs = "1.2"
easy-ext = "1"
either = "1"
enum-as-inner = "0.6"
futures-async-stream = { workspace = true }
futures-util = "0.3"
itertools = "0.11"
Expand Down
3 changes: 2 additions & 1 deletion src/expr/core/src/window_function/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::fmt::Display;

use enum_as_inner::EnumAsInner;
use risingwave_common::bail;
use risingwave_common::types::DataType;
use risingwave_pb::expr::window_frame::{PbBound, PbExclusion};
Expand Down Expand Up @@ -267,7 +268,7 @@ impl FrameBound<usize> {
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
pub enum FrameExclusion {
CurrentRow,
// Group,
Expand Down
153 changes: 109 additions & 44 deletions src/expr/core/src/window_function/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::BTreeSet;

use futures_util::FutureExt;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::estimate_size::{EstimateSize, KvSize};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::iter_util::ZipEqFast;
Expand All @@ -24,12 +24,16 @@ use smallvec::SmallVec;

use super::buffer::WindowBuffer;
use super::{StateEvictHint, StateKey, StatePos, WindowState};
use crate::aggregate::{build_append_only, AggArgs, AggCall, BoxedAggregateFunction};
use crate::aggregate::{
AggArgs, AggCall, AggregateFunction, AggregateState as AggImplState, BoxedAggregateFunction,
};
use crate::sig::FUNCTION_REGISTRY;
use crate::window_function::{WindowFuncCall, WindowFuncKind};
use crate::Result;

pub struct AggregateState {
agg_call: AggCall,
agg_func: BoxedAggregateFunction,
agg_impl: AggImpl,
arg_data_types: Vec<DataType>,
buffer: WindowBuffer<StateKey, SmallVec<[Datum; 2]>>,
buffer_heap_size: KvSize,
Expand Down Expand Up @@ -58,13 +62,54 @@ impl AggregateState {
distinct: false,
direct_args: vec![],
};
let agg_func_sig = FUNCTION_REGISTRY
.get_aggregate(
agg_kind,
&arg_data_types,
&call.return_type,
false, // means prefer retractable version
)
.expect("the agg func must exist");
let agg_func = agg_func_sig.build_aggregate(&agg_call)?;
let (agg_impl, enable_delta) =
if !agg_func_sig.append_only && call.frame.exclusion.is_no_others() {
let init_state = agg_func.create_state();
(AggImpl::Incremental(init_state), true)
} else {
(AggImpl::Full, false)
};
Ok(Self {
agg_call,
agg_func,
agg_impl,
arg_data_types,
buffer: WindowBuffer::new(call.frame.clone()),
buffer: WindowBuffer::new(call.frame.clone(), enable_delta),
buffer_heap_size: KvSize::new(),
})
}

fn slide_inner(&mut self) -> StateEvictHint {
let removed_keys: BTreeSet<_> = self
.buffer
.slide()
.map(|(k, v)| {
v.iter().for_each(|arg| {
self.buffer_heap_size.sub_val(arg);
});
self.buffer_heap_size.sub_val(&k);
k
})
.collect();
if removed_keys.is_empty() {
StateEvictHint::CannotEvict(
self.buffer
.smallest_key()
.expect("sliding without removing, must have some entry in the buffer")
.clone(),
)
} else {
StateEvictHint::CanEvict(removed_keys)
}
}
}

impl WindowState for AggregateState {
Expand All @@ -84,36 +129,35 @@ impl WindowState for AggregateState {
}
}

fn curr_output(&self) -> Result<Datum> {
fn slide(&mut self) -> Result<(Datum, StateEvictHint)> {
let wrapper = AggregatorWrapper {
agg: build_append_only(&self.agg_call)?,
agg_func: self.agg_func.as_ref(),
arg_data_types: &self.arg_data_types,
};
wrapper.aggregate(self.buffer.curr_window_values().map(SmallVec::as_slice))
let output = match self.agg_impl {
AggImpl::Full => wrapper.aggregate(self.buffer.curr_window_values()),
AggImpl::Incremental(ref mut state) => {
wrapper.update(state, self.buffer.consume_curr_window_values_delta())
}
}?;
let evict_hint = self.slide_inner();
Ok((output, evict_hint))
}

fn slide_forward(&mut self) -> StateEvictHint {
let removed_keys: BTreeSet<_> = self
.buffer
.slide()
.map(|(k, v)| {
v.iter().for_each(|arg| {
self.buffer_heap_size.sub_val(arg);
});
self.buffer_heap_size.sub_val(&k);
k
})
.collect();
if removed_keys.is_empty() {
StateEvictHint::CannotEvict(
self.buffer
.smallest_key()
.expect("sliding without removing, must have some entry in the buffer")
.clone(),
)
} else {
StateEvictHint::CanEvict(removed_keys)
}
fn slide_no_output(&mut self) -> Result<StateEvictHint> {
match self.agg_impl {
AggImpl::Full => {}
AggImpl::Incremental(ref mut state) => {
// for incremental agg, we need to update the state even if the caller doesn't need
// the output
let wrapper = AggregatorWrapper {
agg_func: self.agg_func.as_ref(),
arg_data_types: &self.arg_data_types,
};
wrapper.update(state, self.buffer.consume_curr_window_values_delta())?;
}
};
Ok(self.slide_inner())
}
}

Expand All @@ -125,41 +169,62 @@ impl EstimateSize for AggregateState {
}
}

enum AggImpl {
Incremental(AggImplState),
Full,
}

struct AggregatorWrapper<'a> {
agg: BoxedAggregateFunction,
agg_func: &'a dyn AggregateFunction,
arg_data_types: &'a [DataType],
}

impl AggregatorWrapper<'_> {
fn aggregate<'a>(&'a self, values: impl Iterator<Item = &'a [Datum]>) -> Result<Datum> {
// TODO(rc): switch to a better general version of aggregator implementation
fn aggregate<V>(&self, values: impl IntoIterator<Item = V>) -> Result<Datum>
where
V: AsRef<[Datum]>,
{
let mut state = self.agg_func.create_state();
self.update(
&mut state,
values.into_iter().map(|args| (Op::Insert, args)),
)
}

fn update<V>(
&self,
state: &mut AggImplState,
delta: impl IntoIterator<Item = (Op, V)>,
) -> Result<Datum>
where
V: AsRef<[Datum]>,
{
let mut args_builders = self
.arg_data_types
.iter()
.map(|data_type| data_type.create_array_builder(0 /* bad! */))
.collect::<Vec<_>>();
let mut n_values = 0;
for value in values {
n_values += 1;
for (builder, datum) in args_builders.iter_mut().zip_eq_fast(value.iter()) {
let mut ops = Vec::new();
let mut n_rows = 0;
for (op, value) in delta {
n_rows += 1;
ops.push(op);
for (builder, datum) in args_builders.iter_mut().zip_eq_fast(value.as_ref()) {
builder.append(datum);
}
}

let columns = args_builders
.into_iter()
.map(|builder| builder.finish().into())
.collect::<Vec<_>>();
let chunk = StreamChunk::from(DataChunk::new(columns, n_values));
let chunk = StreamChunk::from_parts(ops, DataChunk::new(columns, n_rows));

let mut state = self.agg.create_state();
self.agg
.update(&mut state, &chunk)
self.agg_func
.update(state, &chunk)
.now_or_never()
.expect("we don't support UDAF currently, so the function should return immediately")?;
self.agg
.get_result(&state)
self.agg_func
.get_result(state)
.now_or_never()
.expect("we don't support UDAF currently, so the function should return immediately")
}
Expand Down
Loading

0 comments on commit f18e73e

Please sign in to comment.