diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 1ff51fe22cfb0..4d0f989ba6ea0 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -285,6 +285,9 @@ enum AggNodeVersion { // https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808 AGG_NODE_VERSION_ISSUE_12140 = 1; + // https://github.com/risingwavelabs/risingwave/issues/13465#issuecomment-1821016508 + AGG_NODE_VERSION_ISSUE_13465 = 2; + // Used for test only. AGG_NODE_VERSION_MAX = 2147483647; } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index cb181677c6aaa..82d50bb4e830f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -217,7 +217,7 @@ impl StreamNode for StreamHashAgg { .collect(), row_count_index: self.row_count_idx as u32, emit_on_window_close: self.base.emit_on_window_close(), - version: PbAggNodeVersion::Issue12140 as _, + version: PbAggNodeVersion::Issue13465 as _, }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index ff590e9695bb9..e633f08f216b8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -128,7 +128,7 @@ impl StreamNode for StreamSimpleAgg { }) .collect(), row_count_index: self.row_count_idx as u32, - version: PbAggNodeVersion::Issue12140 as _, + version: PbAggNodeVersion::Issue13465 as _, }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index e9db33b13b626..905900445bce0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -100,7 +100,7 @@ impl StreamNode for StreamStatelessSimpleAgg { intermediate_state_table: None, is_append_only: self.input().append_only(), distinct_dedup_tables: Default::default(), - version: AggNodeVersion::Issue12140 as _, + version: AggNodeVersion::Issue13465 as _, }) } } diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index aea954f2095a2..d0e97cd4783e9 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -31,6 +31,7 @@ use risingwave_storage::StateStore; use super::agg_state::{AggState, AggStateStorage}; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorResult; +use crate::executor::PkIndices; pub trait Strategy { /// Infer the change type of the aggregation result. Don't need to take the ownership of @@ -198,6 +199,7 @@ impl AggGroup { agg_funcs: &[BoxedAggregateFunction], storages: &[AggStateStorage], intermediate_state_table: &StateTable, + pk_indices: &PkIndices, row_count_index: usize, extreme_cache_size: usize, input_schema: &Schema, @@ -217,6 +219,7 @@ impl AggGroup { agg_func, &storages[idx], encoded_states.as_ref().map(|outputs| &outputs[idx]), + pk_indices, extreme_cache_size, input_schema, )?; @@ -248,6 +251,7 @@ impl AggGroup { agg_funcs: &[BoxedAggregateFunction], storages: &[AggStateStorage], encoded_states: &OwnedRow, + pk_indices: &PkIndices, row_count_index: usize, extreme_cache_size: usize, input_schema: &Schema, @@ -260,6 +264,7 @@ impl AggGroup { agg_func, &storages[idx], Some(&encoded_states[idx]), + pk_indices, extreme_cache_size, input_schema, )?; diff --git a/src/stream/src/executor/aggregation/agg_state.rs b/src/stream/src/executor/aggregation/agg_state.rs index 4b74587c3d8b8..aa72673718bf0 100644 --- a/src/stream/src/executor/aggregation/agg_state.rs +++ b/src/stream/src/executor/aggregation/agg_state.rs @@ -27,7 +27,7 @@ use super::minput::MaterializedInputState; use super::GroupKey; use crate::common::table::state_table::StateTable; use crate::common::StateTableColumnMapping; -use crate::executor::StreamExecutorResult; +use crate::executor::{PkIndices, StreamExecutorResult}; /// Represents the persistent storage of aggregation state. pub enum AggStateStorage { @@ -73,6 +73,7 @@ impl AggState { agg_func: &BoxedAggregateFunction, storage: &AggStateStorage, encoded_state: Option<&Datum>, + pk_indices: &PkIndices, extreme_cache_size: usize, input_schema: &Schema, ) -> StreamExecutorResult { @@ -91,6 +92,7 @@ impl AggState { } => Self::MaterializedInput(Box::new(MaterializedInputState::new( version, agg_call, + pk_indices, order_columns, mapping, extreme_cache_size, diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index e90058a7ea720..327ff909b4765 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -23,7 +23,7 @@ use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, RowExt}; use risingwave_common::types::Datum; use risingwave_common::util::row_serde::OrderedRowSerde; -use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_expr::aggregate::{AggCall, AggKind, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; use risingwave_storage::store::PrefetchOptions; @@ -34,7 +34,7 @@ use super::GroupKey; use crate::common::cache::{OrderedStateCache, TopNStateCache}; use crate::common::table::state_table::StateTable; use crate::common::StateTableColumnMapping; -use crate::executor::StreamExecutorResult; +use crate::executor::{PkIndices, StreamExecutorResult}; /// Aggregation state as a materialization of input chunks. /// @@ -71,25 +71,33 @@ impl MaterializedInputState { pub fn new( version: PbAggNodeVersion, agg_call: &AggCall, - order_columns: &Vec, + pk_indices: &PkIndices, + order_columns: &[ColumnOrder], col_mapping: &StateTableColumnMapping, extreme_cache_size: usize, input_schema: &Schema, ) -> StreamExecutorResult { - let arg_col_indices = agg_call.args.val_indices().to_vec(); - let mut order_col_indices = vec![]; - let mut order_types = vec![]; - for o in order_columns { - order_col_indices.push(o.column_index); - order_types.push(o.order_type); - } - if agg_call.distinct && version < PbAggNodeVersion::Issue12140 { panic!( "RisingWave versions before issue #12140 is resolved has critical bug, you must re-create current MV to ensure correctness." ); } + let arg_col_indices = agg_call.args.val_indices().to_vec(); + + let (order_col_indices, order_types) = if version < PbAggNodeVersion::Issue13465 { + generate_order_columns_before_version_issue_13465( + agg_call, + pk_indices, + &arg_col_indices, + ) + } else { + order_columns + .iter() + .map(|o| (o.column_index, o.order_type)) + .unzip() + }; + // map argument columns to state table column indices let state_table_arg_col_indices = arg_col_indices .iter() @@ -224,6 +232,57 @@ impl MaterializedInputState { } } +/// Copied from old code before . +fn generate_order_columns_before_version_issue_13465( + agg_call: &AggCall, + pk_indices: &PkIndices, + arg_col_indices: &[usize], +) -> (Vec, Vec) { + let (mut order_col_indices, mut order_types) = + if matches!(agg_call.kind, AggKind::Min | AggKind::Max) { + // `min`/`max` need not to order by any other columns, but have to + // order by the agg value implicitly. + let order_type = if agg_call.kind == AggKind::Min { + OrderType::ascending() + } else { + OrderType::descending() + }; + (vec![arg_col_indices[0]], vec![order_type]) + } else { + agg_call + .column_orders + .iter() + .map(|p| { + ( + p.column_index, + if agg_call.kind == AggKind::LastValue { + p.order_type.reverse() + } else { + p.order_type + }, + ) + }) + .unzip() + }; + + if agg_call.distinct { + // If distinct, we need to materialize input with the distinct keys + // As we only support single-column distinct for now, we use the + // `agg_call.args.val_indices()[0]` as the distinct key. + if !order_col_indices.contains(&agg_call.args.val_indices()[0]) { + order_col_indices.push(agg_call.args.val_indices()[0]); + order_types.push(OrderType::ascending()); + } + } else { + // If not distinct, we need to materialize input with the primary keys + let pk_len = pk_indices.len(); + order_col_indices.extend(pk_indices.iter()); + order_types.extend(itertools::repeat_n(OrderType::ascending(), pk_len)); + } + + (order_col_indices, order_types) +} + #[cfg(test)] mod tests { use std::collections::HashSet; @@ -247,7 +306,7 @@ mod tests { use crate::common::table::state_table::StateTable; use crate::common::StateTableColumnMapping; use crate::executor::aggregation::GroupKey; - use crate::executor::StreamExecutorResult; + use crate::executor::{PkIndices, StreamExecutorResult}; fn create_chunk( pretty: &str, @@ -317,6 +376,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -370,6 +430,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -415,6 +476,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -468,6 +530,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -529,6 +592,7 @@ mod tests { let mut state_1 = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call_1, + &PkIndices::new(), // unused &order_columns_1, &mapping_1, usize::MAX, @@ -543,6 +607,7 @@ mod tests { let mut state_2 = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call_2, + &PkIndices::new(), // unused &order_columns_2, &mapping_2, usize::MAX, @@ -630,6 +695,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -682,6 +748,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -729,6 +796,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, 1024, @@ -832,6 +900,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, 3, // cache capacity = 3 for easy testing @@ -944,6 +1013,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, @@ -1028,6 +1098,7 @@ mod tests { let mut state = MaterializedInputState::new( PbAggNodeVersion::Max, &agg_call, + &PkIndices::new(), // unused &order_columns, &mapping, usize::MAX, diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index e2fad54cbb2d0..fcab61276fd2f 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -88,6 +88,9 @@ struct ExecutorInner { actor_ctx: ActorContextRef, info: ExecutorInfo, + /// Pk indices from input. Only used by `AggNodeVersion` before `ISSUE_13465`. + input_pk_indices: Vec, + /// Schema from input. input_schema: Schema, @@ -232,6 +235,7 @@ impl HashAggExecutor { version: args.version, actor_ctx: args.actor_ctx, info: args.info, + input_pk_indices: input_info.pk_indices, input_schema: input_info.schema, group_key_indices: args.extra.group_key_indices, group_key_table_pk_projection: group_key_table_pk_projection.to_vec().into(), @@ -319,6 +323,7 @@ impl HashAggExecutor { &this.agg_funcs, &this.storages, &this.intermediate_state_table, + &this.input_pk_indices, this.row_count_index, this.extreme_cache_size, &this.input_schema, @@ -472,6 +477,7 @@ impl HashAggExecutor { &this.agg_funcs, &this.storages, &states, + &this.input_pk_indices, this.row_count_index, this.extreme_cache_size, &this.input_schema, diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index e3dc8f842922b..0f81c4e522f2a 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -59,6 +59,9 @@ struct ExecutorInner { actor_ctx: ActorContextRef, info: ExecutorInfo, + /// Pk indices from input. Only used by `AggNodeVersion` before `ISSUE_13465`. + input_pk_indices: Vec, + /// Schema from input. input_schema: Schema, @@ -138,6 +141,7 @@ impl SimpleAggExecutor { version: args.version, actor_ctx: args.actor_ctx, info: args.info, + input_pk_indices: input_info.pk_indices, input_schema: input_info.schema, agg_funcs: args.agg_calls.iter().map(build_retractable).try_collect()?, agg_calls: args.agg_calls, @@ -275,6 +279,7 @@ impl SimpleAggExecutor { &this.agg_funcs, &this.storages, &this.intermediate_state_table, + &this.input_pk_indices, this.row_count_index, this.extreme_cache_size, &this.input_schema,