Skip to content

Commit

Permalink
fix(stream): fix the minput's indicies when it's for distinct call (#…
Browse files Browse the repository at this point in the history
…13015)

Signed-off-by: Richard Chien <[email protected]>
Co-authored-by: Richard Chien <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
3 people committed Oct 27, 2023
1 parent ec5146f commit b00281e
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 19 deletions.
75 changes: 75 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12140.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# https://github.com/risingwavelabs/risingwave/issues/12140

statement ok
CREATE TABLE t (c3 INT, c9 CHARACTER VARYING);

statement ok
INSERT INTO t VALUES (1, 'interesting'), (2, 'interesting'), (3, 'interesting'), (4, 'IwfwuseZmg'), (5, 'ZuT4aIQVhA');

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
first_value(DISTINCT t.c9 ORDER BY t.c9 ASC)
FROM
t;

statement ok
DELETE FROM t WHERE c3 = 1;

statement ok
DELETE FROM t WHERE c3 = 2;

statement ok
DELETE FROM t WHERE c3 = 3;

statement ok
drop materialized view mv;

statement ok
drop table t;

statement ok
CREATE TABLE t (c3 INT, c9 CHARACTER VARYING);

statement ok
INSERT INTO t VALUES (1, 'interesting'), (2, 'interesting'), (3, 'interesting'), (1, 'boring'), (2, 'boring'), (3, 'boring'), (1, 'exciting'), (2, 'exciting'), (3, 'exciting'), (4, 'IwfwuseZmg'), (5, 'ZuT4aIQVhA');

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
first_value(DISTINCT t.c9 ORDER BY t.c9 ASC), last_value(distinct c3 order by c3 asc)
FROM
t;

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'exciting';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'exciting';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'exciting';

statement ok
drop materialized view mv;

statement ok
drop table t;
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ message AggCallState {
reserved "table_state";
}

enum AggNodeVersion {
AGG_NODE_VERSION_UNSPECIFIED = 0;

// https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808
AGG_NODE_VERSION_ISSUE_12140 = 1;

// Used for test only.
AGG_NODE_VERSION_MAX = 2147483647;
}

message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
// Only used for stateless simple agg.
Expand All @@ -260,6 +270,7 @@ message SimpleAggNode {
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
AggNodeVersion version = 8;
}

message HashAggNode {
Expand All @@ -273,6 +284,7 @@ message HashAggNode {
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
bool emit_on_window_close = 8;
AggNodeVersion version = 9;
}

message TopNNode {
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@
expected_outputs:
- batch_plan
- stream_plan
- stream_dist_plan # check the state table schema
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
expected_outputs:
- stream_dist_plan # check the state table schema
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
102 changes: 102 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,108 @@
└─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] }
└── StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
├── intermediate state table: 1
├── state tables: [ 0 ]
├── distinct tables: []
└── StreamExchange Single from 1
Fragment 1
Chain { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── state table: 2
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ t_y, t__row_id, t_x ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 1
├── columns: [ first_value(t_x order_by(t_y ASC)), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 2
├── columns: [ vnode, _row_id, t_backfill_finished, t_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 4294967294
├── columns: [ first_value ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))] }
└── StreamSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC)), count] }
├── intermediate state table: 1
├── state tables: [ 0 ]
├── distinct tables: [ (distinct key: t.x, table id: 2) ]
└── StreamExchange Single from 1
Fragment 1
Chain { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── state table: 3
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ t_x ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 1
├── columns: [ first_value(distinct t_x order_by(t_x ASC)), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 2
├── columns: [ t_x, count_for_agg_call_0 ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: []
└── read pk prefix len hint: 1
Table 3
├── columns: [ vnode, _row_id, t_backfill_finished, t_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 4294967294
├── columns: [ first_value ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
19 changes: 16 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::{fmt, vec};

use fixedbitset::FixedBitSet;
use itertools::{Either, Itertools};
Expand Down Expand Up @@ -348,6 +348,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

let gen_materialized_input_state = |sort_keys: Vec<(OrderType, usize)>,
extra_keys: Vec<usize>,
include_keys: Vec<usize>|
-> MaterializedInputState {
let (mut table_builder, mut included_upstream_indices, mut column_mapping) =
Expand Down Expand Up @@ -375,7 +376,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
for (order_type, idx) in sort_keys {
add_column(idx, Some(order_type), true, &mut table_builder);
}
for &idx in &in_pks {
for idx in extra_keys {
add_column(idx, Some(OrderType::ascending()), true, &mut table_builder);
}
for idx in include_keys {
Expand Down Expand Up @@ -458,6 +459,17 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
_ => unreachable!(),
}
};

// columns to ensure each row unique
let extra_keys = if agg_call.distinct {
// if distinct, use distinct keys as extra keys
let distinct_key = agg_call.inputs[0].index;
vec![distinct_key]
} else {
// if not distinct, use primary keys as extra keys
in_pks.clone()
};

// other columns that should be contained in state table
let include_keys = match agg_call.agg_kind {
AggKind::FirstValue
Expand All @@ -470,7 +482,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
}
_ => vec![],
};
let state = gen_materialized_input_state(sort_keys, include_keys);

let state = gen_materialized_input_state(sort_keys, extra_keys, include_keys);
AggCallState::MaterializedInput(Box::new(state))
}
agg_kinds::rewritten!() => {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl StreamNode for StreamHashAgg {
.collect(),
row_count_index: self.row_count_idx as u32,
emit_on_window_close: self.base.emit_on_window_close,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl StreamNode for StreamSimpleAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl StreamNode for StreamStatelessSimpleAgg {
intermediate_state_table: None,
is_append_only: self.input().append_only(),
distinct_dedup_tables: Default::default(),
version: AggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use risingwave_expr::aggregate::AggCall;
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::aggregation::AggStateStorage;
Expand All @@ -27,6 +28,8 @@ use crate::task::AtomicU64Ref;

/// Arguments needed to construct an `XxxAggExecutor`.
pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Executor>,
pub actor_ctx: ActorContextRef,
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::must_match;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::agg_state::{AggState, AggStateStorage};
Expand Down Expand Up @@ -192,6 +193,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
/// For [`crate::executor::SimpleAggExecutor`], the `group_key` should be `None`.
#[allow(clippy::too_many_arguments)]
pub async fn create(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
Expand All @@ -212,6 +214,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
let mut states = Vec::with_capacity(agg_calls.len());
for (idx, (agg_call, agg_func)) in agg_calls.iter().zip_eq_fast(agg_funcs).enumerate() {
let state = AggState::create(
version,
agg_call,
agg_func,
&storages[idx],
Expand Down Expand Up @@ -242,6 +245,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
/// Create a group from encoded states for EOWC. The previous output is set to `None`.
#[allow(clippy::too_many_arguments)]
pub fn create_eowc(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
Expand All @@ -255,6 +259,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
let mut states = Vec::with_capacity(agg_calls.len());
for (idx, (agg_call, agg_func)) in agg_calls.iter().zip_eq_fast(agg_funcs).enumerate() {
let state = AggState::create(
version,
agg_call,
agg_func,
&storages[idx],
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/aggregation/agg_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::must_match;
use risingwave_common::types::Datum;
use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::minput::MaterializedInputState;
Expand Down Expand Up @@ -65,6 +66,7 @@ impl AggState {
/// Create an [`AggState`] from a given [`AggCall`].
#[allow(clippy::too_many_arguments)]
pub fn create(
version: PbAggNodeVersion,
agg_call: &AggCall,
agg_func: &BoxedAggregateFunction,
storage: &AggStateStorage<impl StateStore>,
Expand All @@ -83,6 +85,7 @@ impl AggState {
}
AggStateStorage::MaterializedInput { mapping, .. } => {
Self::MaterializedInput(Box::new(MaterializedInputState::new(
version,
agg_call,
pk_indices,
mapping,
Expand Down
Loading

0 comments on commit b00281e

Please sign in to comment.