Skip to content

Commit

Permalink
Merge branch 'main' into bump-rustix-0-37
Browse files Browse the repository at this point in the history
  • Loading branch information
TennyZhuang authored Oct 26, 2023
2 parents 12ce46a + 4a8b0cb commit c954abe
Show file tree
Hide file tree
Showing 19 changed files with 330 additions and 21 deletions.
10 changes: 10 additions & 0 deletions e2e_test/batch/functions/substr.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,13 @@ query T
select substr('a', 2147483646, 1);
----
(empty)

query T
select substr('abcde'::bytea, 2, 7);
----
\x62636465

query T
select substr('abcde'::bytea, -2, 5);
----
\x6162
75 changes: 75 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12140.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# https://github.com/risingwavelabs/risingwave/issues/12140

statement ok
CREATE TABLE t (c3 INT, c9 CHARACTER VARYING);

statement ok
INSERT INTO t VALUES (1, 'interesting'), (2, 'interesting'), (3, 'interesting'), (4, 'IwfwuseZmg'), (5, 'ZuT4aIQVhA');

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
first_value(DISTINCT t.c9 ORDER BY t.c9 ASC)
FROM
t;

statement ok
DELETE FROM t WHERE c3 = 1;

statement ok
DELETE FROM t WHERE c3 = 2;

statement ok
DELETE FROM t WHERE c3 = 3;

statement ok
drop materialized view mv;

statement ok
drop table t;

statement ok
CREATE TABLE t (c3 INT, c9 CHARACTER VARYING);

statement ok
INSERT INTO t VALUES (1, 'interesting'), (2, 'interesting'), (3, 'interesting'), (1, 'boring'), (2, 'boring'), (3, 'boring'), (1, 'exciting'), (2, 'exciting'), (3, 'exciting'), (4, 'IwfwuseZmg'), (5, 'ZuT4aIQVhA');

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
first_value(DISTINCT t.c9 ORDER BY t.c9 ASC), last_value(distinct c3 order by c3 asc)
FROM
t;

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'interesting';

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 1 and c9 = 'exciting';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 2 and c9 = 'exciting';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'boring';

statement ok
DELETE FROM t WHERE c3 = 3 and c9 = 'exciting';

statement ok
drop materialized view mv;

statement ok
drop table t;
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ message AggCallState {
reserved "table_state";
}

enum AggNodeVersion {
AGG_NODE_VERSION_UNSPECIFIED = 0;

// https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808
AGG_NODE_VERSION_ISSUE_12140 = 1;

// Used for test only.
AGG_NODE_VERSION_MAX = 2147483647;
}

message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
// Only used for stateless simple agg.
Expand All @@ -279,6 +289,7 @@ message SimpleAggNode {
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
AggNodeVersion version = 8;
}

message HashAggNode {
Expand All @@ -292,6 +303,7 @@ message HashAggNode {
map<uint32, catalog.Table> distinct_dedup_tables = 6;
uint32 row_count_index = 7;
bool emit_on_window_close = 8;
AggNodeVersion version = 9;
}

message TopNNode {
Expand Down
26 changes: 24 additions & 2 deletions src/expr/impl/src/scalar/substr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ pub fn substr_start(s: &str, start: i32, writer: &mut impl Write) -> Result<()>
Ok(())
}

#[function("substr(varchar, int4, int4) -> varchar")]
pub fn substr_start_for(s: &str, start: i32, count: i32, writer: &mut impl Write) -> Result<()> {
#[function("substr(bytea, int4) -> bytea")]
pub fn substr_start_bytea(s: &[u8], start: i32) -> Box<[u8]> {
let skip = start.saturating_sub(1).max(0) as usize;

s.iter().copied().skip(skip).collect()
}

fn convert_args(start: i32, count: i32) -> Result<(usize, usize)> {
if count < 0 {
return Err(ExprError::InvalidParam {
name: "length",
Expand All @@ -44,6 +50,15 @@ pub fn substr_start_for(s: &str, start: i32, count: i32, writer: &mut impl Write
count.saturating_add(start.saturating_sub(1)).max(0) as usize
};

// The returned args may still go out of bounds.
// So `skip` and `take` on iterator is safer than `[skip..(skip+take)]`
Ok((skip, take))
}

#[function("substr(varchar, int4, int4) -> varchar")]
pub fn substr_start_for(s: &str, start: i32, count: i32, writer: &mut impl Write) -> Result<()> {
let (skip, take) = convert_args(start, count)?;

let substr = s.chars().skip(skip).take(take);
for char in substr {
writer.write_char(char).unwrap();
Expand All @@ -52,6 +67,13 @@ pub fn substr_start_for(s: &str, start: i32, count: i32, writer: &mut impl Write
Ok(())
}

#[function("substr(bytea, int4, int4) -> bytea")]
pub fn substr_start_for_bytea(s: &[u8], start: i32, count: i32) -> Result<Box<[u8]>> {
let (skip, take) = convert_args(start, count)?;

Ok(s.iter().copied().skip(skip).take(take).collect())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@
expected_outputs:
- batch_plan
- stream_plan
- stream_dist_plan # check the state table schema
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
expected_outputs:
- stream_dist_plan # check the state table schema
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
102 changes: 102 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,108 @@
└─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] }
└── StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
├── intermediate state table: 1
├── state tables: [ 0 ]
├── distinct tables: []
└── StreamExchange Single from 1
Fragment 1
Chain { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── state table: 2
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ t_y, t__row_id, t_x ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 1
├── columns: [ first_value(t_x order_by(t_y ASC)), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 2
├── columns: [ vnode, _row_id, t_backfill_finished, t_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 4294967294
├── columns: [ first_value ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))] }
└── StreamSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC)), count] }
├── intermediate state table: 1
├── state tables: [ 0 ]
├── distinct tables: [ (distinct key: t.x, table id: 2) ]
└── StreamExchange Single from 1
Fragment 1
Chain { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── state table: 3
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ t_x ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 1
├── columns: [ first_value(distinct t_x order_by(t_x ASC)), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 2
├── columns: [ t_x, count_for_agg_call_0 ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: []
└── read pk prefix len hint: 1
Table 3
├── columns: [ vnode, _row_id, t_backfill_finished, t_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 4294967294
├── columns: [ first_value ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
19 changes: 16 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::{fmt, vec};

use fixedbitset::FixedBitSet;
use itertools::{Either, Itertools};
Expand Down Expand Up @@ -348,6 +348,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

let gen_materialized_input_state = |sort_keys: Vec<(OrderType, usize)>,
extra_keys: Vec<usize>,
include_keys: Vec<usize>|
-> MaterializedInputState {
let (mut table_builder, mut included_upstream_indices, mut column_mapping) =
Expand Down Expand Up @@ -375,7 +376,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
for (order_type, idx) in sort_keys {
add_column(idx, Some(order_type), true, &mut table_builder);
}
for &idx in &in_pks {
for idx in extra_keys {
add_column(idx, Some(OrderType::ascending()), true, &mut table_builder);
}
for idx in include_keys {
Expand Down Expand Up @@ -458,6 +459,17 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
_ => unreachable!(),
}
};

// columns to ensure each row unique
let extra_keys = if agg_call.distinct {
// if distinct, use distinct keys as extra keys
let distinct_key = agg_call.inputs[0].index;
vec![distinct_key]
} else {
// if not distinct, use primary keys as extra keys
in_pks.clone()
};

// other columns that should be contained in state table
let include_keys = match agg_call.agg_kind {
AggKind::FirstValue
Expand All @@ -470,7 +482,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
}
_ => vec![],
};
let state = gen_materialized_input_state(sort_keys, include_keys);

let state = gen_materialized_input_state(sort_keys, extra_keys, include_keys);
AggCallState::MaterializedInput(Box::new(state))
}
agg_kinds::rewritten!() => {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl StreamNode for StreamHashAgg {
.collect(),
row_count_index: self.row_count_idx as u32,
emit_on_window_close: self.base.emit_on_window_close(),
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl StreamNode for StreamSimpleAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
version: PbAggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl StreamNode for StreamStatelessSimpleAgg {
intermediate_state_table: None,
is_append_only: self.input().append_only(),
distinct_dedup_tables: Default::default(),
version: AggNodeVersion::Issue12140 as _,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use risingwave_expr::aggregate::AggCall;
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::aggregation::AggStateStorage;
Expand All @@ -27,6 +28,8 @@ use crate::task::AtomicU64Ref;

/// Arguments needed to construct an `XxxAggExecutor`.
pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Executor>,
pub actor_ctx: ActorContextRef,
Expand Down
Loading

0 comments on commit c954abe

Please sign in to comment.