Skip to content

Commit

Permalink
add agg node version
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Oct 24, 2023
1 parent c010792 commit 534cbba
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 0 deletions.
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ message AggCallState {
reserved "table_state";
}

enum AggNodeVersion {
AGG_NODE_VERSION_UNSPECIFIED = 0;

// https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808
AGG_NODE_VERSION_ISSUE_12140 = 1;

// max value of int32
AGG_NODE_VERSION_MAX = 2147483647;
}

message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
// Only used for stateless simple agg.
Expand All @@ -279,6 +289,7 @@ message SimpleAggNode {
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
AggNodeVersion version = 8;
}

message HashAggNode {
Expand All @@ -292,6 +303,7 @@ message HashAggNode {
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
bool emit_on_window_close = 8;
AggNodeVersion version = 9;
}

message TopNNode {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl StreamNode for StreamHashAgg {
.collect(),
row_count_index: self.row_count_idx as u32,
emit_on_window_close: self.base.emit_on_window_close,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl StreamNode for StreamSimpleAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl StreamNode for StreamStatelessSimpleAgg {
intermediate_state_table: None,
is_append_only: self.input().append_only(),
distinct_dedup_tables: Default::default(),
version: AggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use risingwave_expr::aggregate::AggCall;
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::aggregation::AggStateStorage;
Expand All @@ -27,6 +28,8 @@ use crate::task::AtomicU64Ref;

/// Arguments needed to construct an `XxxAggExecutor`.
pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Executor>,
pub actor_ctx: ActorContextRef,
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::must_match;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::agg_state::{AggState, AggStateStorage};
Expand Down Expand Up @@ -192,6 +193,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
/// For [`crate::executor::SimpleAggExecutor`], the `group_key` should be `None`.
#[allow(clippy::too_many_arguments)]
pub async fn create(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
Expand All @@ -212,6 +214,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
let mut states = Vec::with_capacity(agg_calls.len());
for (idx, (agg_call, agg_func)) in agg_calls.iter().zip_eq_fast(agg_funcs).enumerate() {
let state = AggState::create(
version,
agg_call,
agg_func,
&storages[idx],
Expand Down Expand Up @@ -242,6 +245,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
/// Create a group from encoded states for EOWC. The previous output is set to `None`.
#[allow(clippy::too_many_arguments)]
pub fn create_eowc(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
Expand All @@ -255,6 +259,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
let mut states = Vec::with_capacity(agg_calls.len());
for (idx, (agg_call, agg_func)) in agg_calls.iter().zip_eq_fast(agg_funcs).enumerate() {
let state = AggState::create(
version,
agg_call,
agg_func,
&storages[idx],
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/aggregation/agg_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::must_match;
use risingwave_common::types::Datum;
use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::minput::MaterializedInputState;
Expand Down Expand Up @@ -65,6 +66,7 @@ impl AggState {
/// Create an [`AggState`] from a given [`AggCall`].
#[allow(clippy::too_many_arguments)]
pub fn create(
version: PbAggNodeVersion,
agg_call: &AggCall,
agg_func: &BoxedAggregateFunction,
storage: &AggStateStorage<impl StateStore>,
Expand All @@ -83,6 +85,7 @@ impl AggState {
}
AggStateStorage::MaterializedInput { mapping, .. } => {
Self::MaterializedInput(Box::new(MaterializedInputState::new(
version,
agg_call,
pk_indices,
mapping,
Expand Down
21 changes: 21 additions & 0 deletions src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::types::Datum;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::aggregate::{AggCall, AggKind, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -68,6 +69,7 @@ pub struct MaterializedInputState {
impl MaterializedInputState {
/// Create an instance from [`AggCall`].
pub fn new(
version: PbAggNodeVersion,
agg_call: &AggCall,
pk_indices: &PkIndices,
col_mapping: &StateTableColumnMapping,
Expand Down Expand Up @@ -103,6 +105,12 @@ impl MaterializedInputState {
};

if agg_call.distinct {
if version < PbAggNodeVersion::Issue12140 {
panic!(
"RisingWave versions before issue #12140 is resolved has critical bug, you may need to re-create current MV to ensure correctness."
);
}

// If distinct, we need to materialize input with the distinct keys
// As we only support single-column distinct for now, we use the
// `agg_call.args.val_indices()[0]` as the distinct key.
Expand Down Expand Up @@ -266,6 +274,7 @@ mod tests {
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::aggregate::{build_append_only, AggCall};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -338,6 +347,7 @@ mod tests {
.await;

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -390,6 +400,7 @@ mod tests {
{
// test recovery (cold start)
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -431,6 +442,7 @@ mod tests {
.await;

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -483,6 +495,7 @@ mod tests {
{
// test recovery (cold start)
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -540,6 +553,7 @@ mod tests {
table_2.init_epoch(epoch);

let mut state_1 = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call_1,
&input_pk_indices,
&mapping_1,
Expand All @@ -549,6 +563,7 @@ mod tests {
.unwrap();

let mut state_2 = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call_2,
&input_pk_indices,
&mapping_2,
Expand Down Expand Up @@ -632,6 +647,7 @@ mod tests {
.await;

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -683,6 +699,7 @@ mod tests {
{
// test recovery (cold start)
let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -726,6 +743,7 @@ mod tests {
table.init_epoch(epoch);

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -825,6 +843,7 @@ mod tests {
.await;

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -932,6 +951,7 @@ mod tests {
.await;

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down Expand Up @@ -1011,6 +1031,7 @@ mod tests {
.await;

let mut state = MaterializedInputState::new(
PbAggNodeVersion::Max,
&agg_call,
&input_pk_indices,
&mapping,
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::types::ScalarImpl;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::agg_common::{AggExecutorArgs, HashAggExecutorExtraArgs};
Expand Down Expand Up @@ -81,6 +82,9 @@ pub struct HashAggExecutor<K: HashKey, S: StateStore> {
struct ExecutorInner<K: HashKey, S: StateStore> {
_phantom: PhantomData<K>,

/// Version of aggregation executors.
version: PbAggNodeVersion,

actor_ctx: ActorContextRef,
info: ExecutorInfo,

Expand Down Expand Up @@ -233,6 +237,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
input: args.input,
inner: ExecutorInner {
_phantom: PhantomData,
version: args.version,
actor_ctx: args.actor_ctx,
info: ExecutorInfo {
schema,
Expand Down Expand Up @@ -318,6 +323,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
// Create `AggGroup` for the current group if not exists. This will
// restore agg states from the intermediate state table.
let agg_group = AggGroup::create(
this.version,
Some(GroupKey::new(
key.deserialize(group_key_types)?,
Some(this.group_key_table_pk_projection.clone()),
Expand Down Expand Up @@ -466,6 +472,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
let states = row.into_iter().skip(this.group_key_indices.len()).collect();

let mut agg_group = AggGroup::create_eowc(
this.version,
Some(GroupKey::new(
group_key,
Some(this.group_key_table_pk_projection.clone()),
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::agg_common::{AggExecutorArgs, SimpleAggExecutorExtraArgs};
Expand Down Expand Up @@ -52,6 +53,9 @@ pub struct SimpleAggExecutor<S: StateStore> {
}

struct ExecutorInner<S: StateStore> {
/// Version of aggregation executors.
version: PbAggNodeVersion,

actor_ctx: ActorContextRef,
info: ExecutorInfo,

Expand Down Expand Up @@ -135,6 +139,7 @@ impl<S: StateStore> SimpleAggExecutor<S> {
Ok(Self {
input: args.input,
inner: ExecutorInner {
version: args.version,
actor_ctx: args.actor_ctx,
info: ExecutorInfo {
schema,
Expand Down Expand Up @@ -260,6 +265,7 @@ impl<S: StateStore> SimpleAggExecutor<S> {
let mut vars = ExecutionVars {
// This will fetch previous agg states from the intermediate state table.
agg_group: AggGroup::create(
this.version,
None,
&this.agg_calls,
&this.agg_funcs,
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ pub mod agg_executor {
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::aggregate::{AggCall, AggKind};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTable;
Expand Down Expand Up @@ -436,6 +437,8 @@ pub mod agg_executor {
.await;

HashAggExecutor::<SerializedKey, S>::new(AggExecutorArgs {
version: PbAggNodeVersion::Max,

input,
actor_ctx: ActorContext::create(123),
pk_indices,
Expand Down Expand Up @@ -499,6 +502,8 @@ pub mod agg_executor {
.await;

SimpleAggExecutor::new(AggExecutorArgs {
version: PbAggNodeVersion::Max,

input,
actor_ctx,
pk_indices,
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/from_proto/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ impl ExecutorBuilder for HashAggExecutorBuilder {

HashAggExecutorDispatcherArgs {
args: AggExecutorArgs {
version: node.version(),

input,
actor_ctx: params.actor_context,
pk_indices: params.pk_indices,
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/from_proto/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ impl ExecutorBuilder for SimpleAggExecutorBuilder {
.await;

Ok(SimpleAggExecutor::new(AggExecutorArgs {
version: node.version(),

input,
actor_ctx: params.actor_context,
pk_indices: params.pk_indices,
Expand Down

0 comments on commit 534cbba

Please sign in to comment.