Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): fix the minput's indicies when it's for distinct call (cherry-pick #13015) #13106

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12140.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# https://github.com/risingwavelabs/risingwave/issues/12140

statement ok
CREATE TABLE t (c3 INT, c9 CHARACTER VARYING);

statement ok
INSERT INTO t VALUES (1, 'interesting'), (2, 'interesting'), (3, 'interesting'), (4, 'IwfwuseZmg'), (5, 'ZuT4aIQVhA');

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
first_value(DISTINCT t.c9 ORDER BY t.c9 ASC)
FROM
t;

statement ok
DELETE FROM t WHERE c3 = 1;

statement ok
DELETE FROM t WHERE c3 = 2;

statement ok
DELETE FROM t WHERE c3 = 3;

statement ok
drop materialized view mv;

statement ok
drop table t;

statement ok
CREATE TABLE t (c3 INT, c9 CHARACTER VARYING);

statement ok
INSERT INTO t VALUES (1, 'interesting'), (2, 'interesting'), (3, 'interesting'), (1, 'boring'), (2, 'boring'), (3, 'boring'), (1, 'exciting'), (2, 'exciting'), (3, 'exciting'), (4, 'IwfwuseZmg'), (5, 'ZuT4aIQVhA');

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
first_value(DISTINCT t.c9 ORDER BY t.c9 ASC), last_value(distinct c3 order by c3 asc)
FROM
t;

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'exciting';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'exciting';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'exciting';

statement ok
drop materialized view mv;

statement ok
drop table t;
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ message AggCallState {
reserved "table_state";
}

enum AggNodeVersion {
AGG_NODE_VERSION_UNSPECIFIED = 0;

// https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808
AGG_NODE_VERSION_ISSUE_12140 = 1;

// Used for test only.
AGG_NODE_VERSION_MAX = 2147483647;
}

message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
// Only used for stateless simple agg.
Expand All @@ -260,6 +270,7 @@ message SimpleAggNode {
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
AggNodeVersion version = 8;
}

message HashAggNode {
Expand All @@ -273,6 +284,7 @@ message HashAggNode {
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
bool emit_on_window_close = 8;
AggNodeVersion version = 9;
}

message TopNNode {
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@
expected_outputs:
- batch_plan
- stream_plan
- stream_dist_plan # check the state table schema
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
expected_outputs:
- stream_dist_plan # check the state table schema
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
102 changes: 102 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,108 @@
└─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] }
└── StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
├── intermediate state table: 1
├── state tables: [ 0 ]
├── distinct tables: []
└── StreamExchange Single from 1

Fragment 1
Chain { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── state table: 2
├── Upstream
└── BatchPlanNode

Table 0
├── columns: [ t_y, t__row_id, t_x ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: []
└── read pk prefix len hint: 0

Table 1
├── columns: [ first_value(t_x order_by(t_y ASC)), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0

Table 2
├── columns: [ vnode, _row_id, t_backfill_finished, t_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 4294967294
├── columns: [ first_value ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0

- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))] }
└── StreamSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC)), count] }
├── intermediate state table: 1
├── state tables: [ 0 ]
├── distinct tables: [ (distinct key: t.x, table id: 2) ]
└── StreamExchange Single from 1

Fragment 1
Chain { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── state table: 3
├── Upstream
└── BatchPlanNode

Table 0
├── columns: [ t_x ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0

Table 1
├── columns: [ first_value(distinct t_x order_by(t_x ASC)), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0

Table 2
├── columns: [ t_x, count_for_agg_call_0 ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: []
└── read pk prefix len hint: 1

Table 3
├── columns: [ vnode, _row_id, t_backfill_finished, t_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 4294967294
├── columns: [ first_value ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0

- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
19 changes: 16 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::{fmt, vec};

use fixedbitset::FixedBitSet;
use itertools::{Either, Itertools};
Expand Down Expand Up @@ -348,6 +348,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

let gen_materialized_input_state = |sort_keys: Vec<(OrderType, usize)>,
extra_keys: Vec<usize>,
include_keys: Vec<usize>|
-> MaterializedInputState {
let (mut table_builder, mut included_upstream_indices, mut column_mapping) =
Expand Down Expand Up @@ -375,7 +376,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
for (order_type, idx) in sort_keys {
add_column(idx, Some(order_type), true, &mut table_builder);
}
for &idx in &in_pks {
for idx in extra_keys {
add_column(idx, Some(OrderType::ascending()), true, &mut table_builder);
}
for idx in include_keys {
Expand Down Expand Up @@ -458,6 +459,17 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
_ => unreachable!(),
}
};

// columns to ensure each row unique
let extra_keys = if agg_call.distinct {
// if distinct, use distinct keys as extra keys
let distinct_key = agg_call.inputs[0].index;
vec![distinct_key]
} else {
// if not distinct, use primary keys as extra keys
in_pks.clone()
};

// other columns that should be contained in state table
let include_keys = match agg_call.agg_kind {
AggKind::FirstValue
Expand All @@ -470,7 +482,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
}
_ => vec![],
};
let state = gen_materialized_input_state(sort_keys, include_keys);

let state = gen_materialized_input_state(sort_keys, extra_keys, include_keys);
AggCallState::MaterializedInput(Box::new(state))
}
agg_kinds::rewritten!() => {
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ pub fn to_stream_prost_body(
.into_iter()
.map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost()))
.collect(),
version: PbAggNodeVersion::Max as _,
})
}
Node::GroupTopN(me) => {
Expand Down Expand Up @@ -658,6 +659,7 @@ pub fn to_stream_prost_body(
.map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost()))
.collect(),
emit_on_window_close: me.emit_on_window_close(),
version: PbAggNodeVersion::Max as _,
})
}
Node::HashJoin(_) => {
Expand Down Expand Up @@ -701,6 +703,7 @@ pub fn to_stream_prost_body(
intermediate_state_table: None,
is_append_only: me.input.0.append_only,
distinct_dedup_tables: Default::default(),
version: PbAggNodeVersion::Max as _,
})
}
Node::Materialize(me) => {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl StreamNode for StreamHashAgg {
.collect(),
row_count_index: self.row_count_idx as u32,
emit_on_window_close: self.base.emit_on_window_close,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl StreamNode for StreamSimpleAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl StreamNode for StreamStatelessSimpleAgg {
intermediate_state_table: None,
is_append_only: self.input().append_only(),
distinct_dedup_tables: Default::default(),
version: AggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use risingwave_expr::aggregate::AggCall;
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::aggregation::AggStateStorage;
Expand All @@ -27,6 +28,8 @@ use crate::task::AtomicU64Ref;

/// Arguments needed to construct an `XxxAggExecutor`.
pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Executor>,
pub actor_ctx: ActorContextRef,
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::must_match;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction};
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::agg_state::{AggState, AggStateStorage};
Expand Down Expand Up @@ -192,6 +193,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
/// For [`crate::executor::SimpleAggExecutor`], the `group_key` should be `None`.
#[allow(clippy::too_many_arguments)]
pub async fn create(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
Expand All @@ -212,6 +214,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
let mut states = Vec::with_capacity(agg_calls.len());
for (idx, (agg_call, agg_func)) in agg_calls.iter().zip_eq_fast(agg_funcs).enumerate() {
let state = AggState::create(
version,
agg_call,
agg_func,
&storages[idx],
Expand Down Expand Up @@ -242,6 +245,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
/// Create a group from encoded states for EOWC. The previous output is set to `None`.
#[allow(clippy::too_many_arguments)]
pub fn create_eowc(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
Expand All @@ -255,6 +259,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
let mut states = Vec::with_capacity(agg_calls.len());
for (idx, (agg_call, agg_func)) in agg_calls.iter().zip_eq_fast(agg_funcs).enumerate() {
let state = AggState::create(
version,
agg_call,
agg_func,
&storages[idx],
Expand Down
Loading