Skip to content

Commit

Permalink
fix(stream agg): fix agg backwards compatibility issue introduced by #…
Browse files Browse the repository at this point in the history
…13376 (#13571)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Nov 22, 2023
1 parent baf1f92 commit ac0bb23
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 16 deletions.
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ enum AggNodeVersion {
// https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808
AGG_NODE_VERSION_ISSUE_12140 = 1;

// https://github.com/risingwavelabs/risingwave/issues/13465#issuecomment-1821016508
AGG_NODE_VERSION_ISSUE_13465 = 2;

// Used for test only.
AGG_NODE_VERSION_MAX = 2147483647;
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl StreamNode for StreamHashAgg {
.collect(),
row_count_index: self.row_count_idx as u32,
emit_on_window_close: self.base.emit_on_window_close(),
version: PbAggNodeVersion::Issue12140 as _,
version: PbAggNodeVersion::Issue13465 as _,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl StreamNode for StreamSimpleAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
version: PbAggNodeVersion::Issue12140 as _,
version: PbAggNodeVersion::Issue13465 as _,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl StreamNode for StreamStatelessSimpleAgg {
intermediate_state_table: None,
is_append_only: self.input().append_only(),
distinct_dedup_tables: Default::default(),
version: AggNodeVersion::Issue12140 as _,
version: AggNodeVersion::Issue13465 as _,
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_storage::StateStore;
use super::agg_state::{AggState, AggStateStorage};
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;
use crate::executor::PkIndices;

pub trait Strategy {
/// Infer the change type of the aggregation result. Don't need to take the ownership of
Expand Down Expand Up @@ -198,6 +199,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
agg_funcs: &[BoxedAggregateFunction],
storages: &[AggStateStorage<S>],
intermediate_state_table: &StateTable<S>,
pk_indices: &PkIndices,
row_count_index: usize,
extreme_cache_size: usize,
input_schema: &Schema,
Expand All @@ -217,6 +219,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
agg_func,
&storages[idx],
encoded_states.as_ref().map(|outputs| &outputs[idx]),
pk_indices,
extreme_cache_size,
input_schema,
)?;
Expand Down Expand Up @@ -248,6 +251,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
agg_funcs: &[BoxedAggregateFunction],
storages: &[AggStateStorage<S>],
encoded_states: &OwnedRow,
pk_indices: &PkIndices,
row_count_index: usize,
extreme_cache_size: usize,
input_schema: &Schema,
Expand All @@ -260,6 +264,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
agg_func,
&storages[idx],
Some(&encoded_states[idx]),
pk_indices,
extreme_cache_size,
input_schema,
)?;
Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/aggregation/agg_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::minput::MaterializedInputState;
use super::GroupKey;
use crate::common::table::state_table::StateTable;
use crate::common::StateTableColumnMapping;
use crate::executor::StreamExecutorResult;
use crate::executor::{PkIndices, StreamExecutorResult};

/// Represents the persistent storage of aggregation state.
pub enum AggStateStorage<S: StateStore> {
Expand Down Expand Up @@ -73,6 +73,7 @@ impl AggState {
agg_func: &BoxedAggregateFunction,
storage: &AggStateStorage<impl StateStore>,
encoded_state: Option<&Datum>,
pk_indices: &PkIndices,
extreme_cache_size: usize,
input_schema: &Schema,
) -> StreamExecutorResult<Self> {
Expand All @@ -91,6 +92,7 @@ impl AggState {
} => Self::MaterializedInput(Box::new(MaterializedInputState::new(
version,
agg_call,
pk_indices,
order_columns,
mapping,
extreme_cache_size,
Expand Down
95 changes: 83 additions & 12 deletions src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::{OwnedRow, RowExt};
use risingwave_common::types::Datum;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::aggregate::{AggCall, AggKind, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::store::PrefetchOptions;
Expand All @@ -34,7 +34,7 @@ use super::GroupKey;
use crate::common::cache::{OrderedStateCache, TopNStateCache};
use crate::common::table::state_table::StateTable;
use crate::common::StateTableColumnMapping;
use crate::executor::StreamExecutorResult;
use crate::executor::{PkIndices, StreamExecutorResult};

/// Aggregation state as a materialization of input chunks.
///
Expand Down Expand Up @@ -71,25 +71,33 @@ impl MaterializedInputState {
pub fn new(
version: PbAggNodeVersion,
agg_call: &AggCall,
order_columns: &Vec<ColumnOrder>,
pk_indices: &PkIndices,
order_columns: &[ColumnOrder],
col_mapping: &StateTableColumnMapping,
extreme_cache_size: usize,
input_schema: &Schema,
) -> StreamExecutorResult<Self> {
let arg_col_indices = agg_call.args.val_indices().to_vec();
let mut order_col_indices = vec![];
let mut order_types = vec![];
for o in order_columns {
order_col_indices.push(o.column_index);
order_types.push(o.order_type);
}

if agg_call.distinct && version < PbAggNodeVersion::Issue12140 {
panic!(
"RisingWave versions before issue #12140 is resolved has critical bug, you must re-create current MV to ensure correctness."
);
}

let arg_col_indices = agg_call.args.val_indices().to_vec();

let (order_col_indices, order_types) = if version < PbAggNodeVersion::Issue13465 {
generate_order_columns_before_version_issue_13465(
agg_call,
pk_indices,
&arg_col_indices,
)
} else {
order_columns
.iter()
.map(|o| (o.column_index, o.order_type))
.unzip()
};

// map argument columns to state table column indices
let state_table_arg_col_indices = arg_col_indices
.iter()
Expand Down Expand Up @@ -224,6 +232,57 @@ impl MaterializedInputState {
}
}

/// Copied from old code before <https://github.com/risingwavelabs/risingwave/commit/0020507edbc4010b20aeeb560c7bea9159315602>.
fn generate_order_columns_before_version_issue_13465(
agg_call: &AggCall,
pk_indices: &PkIndices,
arg_col_indices: &[usize],
) -> (Vec<usize>, Vec<OrderType>) {
let (mut order_col_indices, mut order_types) =
if matches!(agg_call.kind, AggKind::Min | AggKind::Max) {
// `min`/`max` need not to order by any other columns, but have to
// order by the agg value implicitly.
let order_type = if agg_call.kind == AggKind::Min {
OrderType::ascending()
} else {
OrderType::descending()
};
(vec![arg_col_indices[0]], vec![order_type])
} else {
agg_call
.column_orders
.iter()
.map(|p| {
(
p.column_index,
if agg_call.kind == AggKind::LastValue {
p.order_type.reverse()
} else {
p.order_type
},
)
})
.unzip()
};

if agg_call.distinct {
// If distinct, we need to materialize input with the distinct keys
// As we only support single-column distinct for now, we use the
// `agg_call.args.val_indices()[0]` as the distinct key.
if !order_col_indices.contains(&agg_call.args.val_indices()[0]) {
order_col_indices.push(agg_call.args.val_indices()[0]);
order_types.push(OrderType::ascending());
}
} else {
// If not distinct, we need to materialize input with the primary keys
let pk_len = pk_indices.len();
order_col_indices.extend(pk_indices.iter());
order_types.extend(itertools::repeat_n(OrderType::ascending(), pk_len));
}

(order_col_indices, order_types)
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand All @@ -247,7 +306,7 @@ mod tests {
use crate::common::table::state_table::StateTable;
use crate::common::StateTableColumnMapping;
use crate::executor::aggregation::GroupKey;
use crate::executor::StreamExecutorResult;
use crate::executor::{PkIndices, StreamExecutorResult};

fn create_chunk<S: StateStore>(
pretty: &str,
Expand Down Expand Up @@ -317,6 +376,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -370,6 +430,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -415,6 +476,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -468,6 +530,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -529,6 +592,7 @@ mod tests {
let mut state_1 = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call_1,
&PkIndices::new(), // unused
&order_columns_1,
&mapping_1,
usize::MAX,
Expand All @@ -543,6 +607,7 @@ mod tests {
let mut state_2 = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call_2,
&PkIndices::new(), // unused
&order_columns_2,
&mapping_2,
usize::MAX,
Expand Down Expand Up @@ -630,6 +695,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -682,6 +748,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -729,6 +796,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
1024,
Expand Down Expand Up @@ -832,6 +900,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
3, // cache capacity = 3 for easy testing
Expand Down Expand Up @@ -944,6 +1013,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down Expand Up @@ -1028,6 +1098,7 @@ mod tests {
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&PkIndices::new(), // unused
&order_columns,
&mapping,
usize::MAX,
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ struct ExecutorInner<K: HashKey, S: StateStore> {
actor_ctx: ActorContextRef,
info: ExecutorInfo,

/// Pk indices from input. Only used by `AggNodeVersion` before `ISSUE_13465`.
input_pk_indices: Vec<usize>,

/// Schema from input.
input_schema: Schema,

Expand Down Expand Up @@ -232,6 +235,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
version: args.version,
actor_ctx: args.actor_ctx,
info: args.info,
input_pk_indices: input_info.pk_indices,
input_schema: input_info.schema,
group_key_indices: args.extra.group_key_indices,
group_key_table_pk_projection: group_key_table_pk_projection.to_vec().into(),
Expand Down Expand Up @@ -319,6 +323,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
&this.agg_funcs,
&this.storages,
&this.intermediate_state_table,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
Expand Down Expand Up @@ -472,6 +477,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
&this.agg_funcs,
&this.storages,
&states,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ struct ExecutorInner<S: StateStore> {
actor_ctx: ActorContextRef,
info: ExecutorInfo,

/// Pk indices from input. Only used by `AggNodeVersion` before `ISSUE_13465`.
input_pk_indices: Vec<usize>,

/// Schema from input.
input_schema: Schema,

Expand Down Expand Up @@ -138,6 +141,7 @@ impl<S: StateStore> SimpleAggExecutor<S> {
version: args.version,
actor_ctx: args.actor_ctx,
info: args.info,
input_pk_indices: input_info.pk_indices,
input_schema: input_info.schema,
agg_funcs: args.agg_calls.iter().map(build_retractable).try_collect()?,
agg_calls: args.agg_calls,
Expand Down Expand Up @@ -275,6 +279,7 @@ impl<S: StateStore> SimpleAggExecutor<S> {
&this.agg_funcs,
&this.storages,
&this.intermediate_state_table,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
Expand Down

0 comments on commit ac0bb23

Please sign in to comment.