From f8de4a96364bbf81dfb10ad38e5e283c3e6d85ca Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Sun, 15 Oct 2023 23:56:05 -0500 Subject: [PATCH 1/3] feat(agg): support `jsonb_agg` and `jsonb_object_agg` in streaming mode (#12836) Signed-off-by: Richard Chien --- e2e_test/streaming/aggregate/jsonb_agg.slt | 46 +++++++++++++++++++ src/expr/core/src/aggregate/def.rs | 6 +-- .../src/optimizer/plan_node/generic/agg.rs | 18 ++++++-- src/stream/src/executor/aggregation/minput.rs | 5 +- 4 files changed, 66 insertions(+), 9 deletions(-) create mode 100644 e2e_test/streaming/aggregate/jsonb_agg.slt diff --git a/e2e_test/streaming/aggregate/jsonb_agg.slt b/e2e_test/streaming/aggregate/jsonb_agg.slt new file mode 100644 index 0000000000000..18cb80cc69085 --- /dev/null +++ b/e2e_test/streaming/aggregate/jsonb_agg.slt @@ -0,0 +1,46 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t(v1 boolean, v2 int, v3 varchar, v4 jsonb); + +statement ok +create materialized view mv_tmp as +select jsonb_agg(v1) as j1 from t; + +statement ok +drop materialized view mv_tmp; + +statement ok +create materialized view mv1 as +select + jsonb_agg(v1 order by v2) as j1, + jsonb_agg(v2 order by v2) as j2, + jsonb_object_agg(v3, v4) as j3 +from t; + +statement ok +insert into t values + (null, 2, 'bbb', null), + (false, 1, 'ccc', 'null'); + +query TTT +select * from mv1; +---- +[false, null] [1, 2] {"bbb": null, "ccc": null} + +statement ok +insert into t values + (true, 0, 'bbb', '999'), + (true, 8, 'ddd', '{"foo": "bar"}'); + +query TTT +select * from mv1; +---- +[true, false, null, true] [0, 1, 2, 8] {"bbb": 999, "ccc": null, "ddd": {"foo": "bar"}} + +statement ok +drop materialized view mv1; + +statement ok +drop table t; diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs index 39d4c158c10d7..f71bfd454a415 100644 --- a/src/expr/core/src/aggregate/def.rs +++ b/src/expr/core/src/aggregate/def.rs @@ -308,11 +308,7 @@ pub mod agg_kinds { #[macro_export] macro_rules! unimplemented_in_stream { () => { - AggKind::JsonbAgg - | AggKind::JsonbObjectAgg - | AggKind::PercentileCont - | AggKind::PercentileDisc - | AggKind::Mode + AggKind::PercentileCont | AggKind::PercentileDisc | AggKind::Mode }; } pub use unimplemented_in_stream; diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 107eec5e51b01..2fb251ca89aa6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -412,7 +412,9 @@ impl Agg { | AggKind::FirstValue | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg => { + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg => { // columns with order requirement in state table let sort_keys = { match agg_call.agg_kind { @@ -425,7 +427,8 @@ impl Agg { AggKind::FirstValue | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg => { + | AggKind::ArrayAgg + | AggKind::JsonbAgg => { if agg_call.order_by.is_empty() { me.ctx().warn_to_user(format!( "{} without ORDER BY may produce non-deterministic result", @@ -447,6 +450,11 @@ impl Agg { }) .collect() } + AggKind::JsonbObjectAgg => agg_call + .order_by + .iter() + .map(|o| (o.order_type, o.column_index)) + .collect(), _ => unreachable!(), } }; @@ -455,7 +463,11 @@ impl Agg { AggKind::FirstValue | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg => agg_call.inputs.iter().map(|i| i.index).collect(), + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg => { + agg_call.inputs.iter().map(|i| i.index).collect() + } _ => vec![], }; let state = gen_materialized_input_state(sort_keys, include_keys); diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 78c7d484385e8..1329f08eb6d99 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -139,7 +139,10 @@ impl MaterializedInputState { agg_call.args.arg_types(), )) } - AggKind::StringAgg | AggKind::ArrayAgg => Box::new(GenericAggStateCache::new( + AggKind::StringAgg + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg => Box::new(GenericAggStateCache::new( OrderedStateCache::new(), agg_call.args.arg_types(), )), From 0aa9f7ed7fe8af6ea19015b9d8bf271a128d4bee Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 16 Oct 2023 13:13:01 +0800 Subject: [PATCH 2/3] fix(sqlsmith): fix generation of decode (#12855) --- src/tests/sqlsmith/src/sql_gen/functions.rs | 11 +++++++++++ src/tests/sqlsmith/src/sql_gen/types.rs | 8 +++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/tests/sqlsmith/src/sql_gen/functions.rs b/src/tests/sqlsmith/src/sql_gen/functions.rs index 3583b820f1204..01cbb0604d262 100644 --- a/src/tests/sqlsmith/src/sql_gen/functions.rs +++ b/src/tests/sqlsmith/src/sql_gen/functions.rs @@ -49,6 +49,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { 4 => self.gen_overlay(context), _ => unreachable!(), }, + T::Bytea => self.gen_decode(context), _ => match self.rng.gen_bool(0.5) { true => self.gen_case(ret, context), false => self.gen_coalesce(ret, context), @@ -121,6 +122,16 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { .collect() } + fn gen_decode(&mut self, context: SqlGeneratorContext) -> Expr { + let input_string = self.gen_expr(&DataType::Varchar, context); + let encoding = &["base64", "hex", "escape"].choose(&mut self.rng).unwrap(); + let args = vec![ + input_string, + Expr::Value(Value::SingleQuotedString(encoding.to_string())), + ]; + Expr::Function(make_simple_func("decode", &args)) + } + fn gen_fixed_func(&mut self, ret: &DataType, context: SqlGeneratorContext) -> Expr { let funcs = match FUNC_TABLE.get(ret) { None => return self.gen_simple_scalar(ret), diff --git a/src/tests/sqlsmith/src/sql_gen/types.rs b/src/tests/sqlsmith/src/sql_gen/types.rs index ea3c00e45e1da..06d170e604ace 100644 --- a/src/tests/sqlsmith/src/sql_gen/types.rs +++ b/src/tests/sqlsmith/src/sql_gen/types.rs @@ -109,7 +109,10 @@ impl TryFrom for CastSig { /// effectiveness, e.g. cause it to crash. static FUNC_BAN_LIST: LazyLock> = LazyLock::new(|| { [ - ExprType::Repeat, // FIXME: https://github.com/risingwavelabs/risingwave/issues/8003 + // FIXME: https://github.com/risingwavelabs/risingwave/issues/8003 + ExprType::Repeat, + // The format argument needs to be handled specially. It is still generated in `gen_special_func`. + ExprType::Decode, ] .into_iter() .collect() @@ -117,6 +120,9 @@ static FUNC_BAN_LIST: LazyLock> = LazyLock::new(|| { /// Table which maps functions' return types to possible function signatures. // ENABLE: https://github.com/risingwavelabs/risingwave/issues/5826 +// TODO: Create a `SPECIAL_FUNC` table. +// Otherwise when we dump the function table, we won't include those functions in +// gen_special_func. pub(crate) static FUNC_TABLE: LazyLock>> = LazyLock::new(|| { let mut funcs = HashMap::>::new(); From d64d2fab70b944088d4dc295ac8eed83a902cee1 Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 16 Oct 2023 14:22:56 +0800 Subject: [PATCH 3/3] feat(optimizer): improve union stream key (#12837) --- .../tests/testdata/input/union.yaml | 42 ++ .../tests/testdata/output/share.yaml | 4 +- .../tests/testdata/output/union.yaml | 403 +++++++++++++++++- .../tests/testdata/output/watermark.yaml | 18 +- .../src/optimizer/plan_node/generic/union.rs | 8 +- .../src/optimizer/plan_node/logical_union.rs | 91 ++-- 6 files changed, 499 insertions(+), 67 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/union.yaml b/src/frontend/planner_test/tests/testdata/input/union.yaml index 2d7a005d12e21..8775d4f9d36f2 100644 --- a/src/frontend/planner_test/tests/testdata/input/union.yaml +++ b/src/frontend/planner_test/tests/testdata/input/union.yaml @@ -53,3 +53,45 @@ expected_outputs: - batch_plan - optimized_logical_plan_for_batch +- name: test merged union stream key (2 columns, row_id + src_col) + sql: | + create table t1 (a int, b numeric, c bigint); + create table t2 (a int, b numeric, c bigint); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - batch_plan + - stream_plan + - stream_dist_plan +- name: test merged union stream key (5 columns, row_id + src_col + a + b + c) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (c)); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - stream_dist_plan +- name: test merged union stream key (4 columns, row_id + src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - stream_dist_plan +- name: test merged union stream key (3 columns, src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (b)); + create table t4 (a int, b numeric, c bigint, primary key (b, a)); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - stream_dist_plan diff --git a/src/frontend/planner_test/tests/testdata/output/share.yaml b/src/frontend/planner_test/tests/testdata/output/share.yaml index 4495a3deeaaf9..2815b00784b1d 100644 --- a/src/frontend/planner_test/tests/testdata/output/share.yaml +++ b/src/frontend/planner_test/tests/testdata/output/share.yaml @@ -150,7 +150,7 @@ create table t(a int, b int); with cte as (select count(*) from t) select * from cte union all select * from cte; stream_plan: |- - StreamMaterialize { columns: [count, 0:Int32(hidden)], stream_key: [0:Int32], pk_columns: [0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [count, $src(hidden)], stream_key: [$src], pk_columns: [$src], pk_conflict: NoCheck } └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(0:Int32) } │ └─StreamProject { exprs: [sum0(count), 0:Int32] } @@ -173,7 +173,7 @@ create table t(a int, b int); with cte as (select count(*) from t) select * from cte union all select * from cte; stream_plan: |- - StreamMaterialize { columns: [count, 0:Int32(hidden)], stream_key: [0:Int32], pk_columns: [0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [count, $src(hidden)], stream_key: [$src], pk_columns: [$src], pk_conflict: NoCheck } └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(0:Int32) } │ └─StreamProject { exprs: [sum0(count), 0:Int32] } diff --git a/src/frontend/planner_test/tests/testdata/output/union.yaml b/src/frontend/planner_test/tests/testdata/output/union.yaml index bc4c1fd4f1d50..14e7b7e65cb70 100644 --- a/src/frontend/planner_test/tests/testdata/output/union.yaml +++ b/src/frontend/planner_test/tests/testdata/output/union.yaml @@ -10,39 +10,56 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], stream_key: [t1._row_id, null:Serial, 0:Int32], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } └─StreamUnion { all: true } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], stream_key: [t1._row_id, null:Serial, 0:Int32], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } ├── materialized table: 4294967294 └── StreamUnion { all: true } - ├── StreamExchange Hash([3, 4, 5]) from 1 - └── StreamExchange Hash([3, 4, 5]) from 2 + ├── StreamExchange Hash([3, 4]) from 1 + └── StreamExchange Hash([3, 4]) from 2 Fragment 1 - StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } └── Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 0 } ├── Upstream └── BatchPlanNode Fragment 2 - StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └── Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } { state table: 1 } ├── Upstream └── BatchPlanNode - Table 0 { columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 0 + ├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 - Table 1 { columns: [ vnode, _row_id, t2_backfill_finished, t2_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 1 + ├── columns: [ vnode, _row_id, t2_backfill_finished, t2_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 - Table 4294967294 { columns: [ a, b, c, t1._row_id, null:Serial, 0:Int32 ], primary key: [ $3 ASC, $4 ASC, $5 ASC ], value indices: [ 0, 1, 2, 3, 4, 5 ], distribution key: [ 3, 4, 5 ], read pk prefix len hint: 3 } + Table 4294967294 + ├── columns: [ a, b, c, t1._row_id, $src ] + ├── primary key: [ $3 ASC, $4 ASC ] + ├── value indices: [ 0, 1, 2, 3, 4 ] + ├── distribution key: [ 3, 4 ] + └── read pk prefix len hint: 2 - sql: | create table t1 (a int, b numeric, c bigint); @@ -68,11 +85,11 @@ └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } └─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } └─StreamUnion { all: true } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } stream_dist_plan: |+ Fragment 0 @@ -87,18 +104,18 @@ Fragment 1 StreamUnion { all: true } - ├── StreamExchange Hash([3, 4, 5]) from 2 - └── StreamExchange Hash([3, 4, 5]) from 3 + ├── StreamExchange Hash([3, 4]) from 2 + └── StreamExchange Hash([3, 4]) from 3 Fragment 2 - StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } └── Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } ├── state table: 1 ├── Upstream └── BatchPlanNode Fragment 3 - StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └── Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } ├── state table: 2 ├── Upstream @@ -292,3 +309,347 @@ └─BatchHashAgg { group_key: [1:Int32], aggs: [] } └─BatchExchange { order: [], dist: HashShard(1:Int32) } └─BatchValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32], [5:Int32]] } +- name: test merged union stream key (2 columns, row_id + src_col) + sql: | + create table t1 (a int, b numeric, c bigint); + create table t2 (a int, b numeric, c bigint); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + batch_plan: |- + BatchUnion { all: true } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t3, columns: [t3.a, t3.b, t3.c], distribution: SomeShard } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t4, columns: [t4.a, t4.b, t4.c], distribution: SomeShard } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t5, columns: [t5.a, t5.b, t5.c], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } + │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + ├─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + │ └─StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } + │ └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } + ├─StreamExchange { dist: HashShard(t3._row_id, 2:Int32) } + │ └─StreamProject { exprs: [t3.a, t3.b, t3.c, t3._row_id, 2:Int32] } + │ └─StreamTableScan { table: t3, columns: [t3.a, t3.b, t3.c, t3._row_id], pk: [t3._row_id], dist: UpstreamHashShard(t3._row_id) } + ├─StreamExchange { dist: HashShard(t4._row_id, 3:Int32) } + │ └─StreamProject { exprs: [t4.a, t4.b, t4.c, t4._row_id, 3:Int32] } + │ └─StreamTableScan { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } + └─StreamExchange { dist: HashShard(t5._row_id, 4:Int32) } + └─StreamProject { exprs: [t5.a, t5.b, t5.c, t5._row_id, 4:Int32] } + └─StreamTableScan { table: t5, columns: [t5.a, t5.b, t5.c, t5._row_id], pk: [t5._row_id], dist: UpstreamHashShard(t5._row_id) } + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([3, 4]) from 1 + ├── StreamExchange Hash([3, 4]) from 2 + ├── StreamExchange Hash([3, 4]) from 3 + ├── StreamExchange Hash([3, 4]) from 4 + └── StreamExchange Hash([3, 4]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, t3._row_id, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c, t3._row_id], pk: [t3._row_id], dist: UpstreamHashShard(t3._row_id) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, t4._row_id, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, t5._row_id, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c, t5._row_id], pk: [t5._row_id], dist: UpstreamHashShard(t5._row_id) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 + ├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 1 + ├── columns: [ vnode, _row_id, t2_backfill_finished, t2_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 2 + ├── columns: [ vnode, _row_id, t3_backfill_finished, t3_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 3 + ├── columns: [ vnode, _row_id, t4_backfill_finished, t4_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4 + ├── columns: [ vnode, _row_id, t5_backfill_finished, t5_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4294967294 + ├── columns: [ a, b, c, t1._row_id, $src ] + ├── primary key: [ $3 ASC, $4 ASC ] + ├── value indices: [ 0, 1, 2, 3, 4 ] + ├── distribution key: [ 3, 4 ] + └── read pk prefix len hint: 2 + +- name: test merged union stream key (5 columns, row_id + src_col + a + b + c) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (c)); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, t1.a(hidden), null:Int64(hidden), null:Decimal(hidden), null:Serial(hidden), $src(hidden)], stream_key: [t1.a, null:Decimal, null:Int64, null:Serial, $src], pk_columns: [t1.a, null:Decimal, null:Int64, null:Serial, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 1 + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 2 + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 3 + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 4 + └── StreamExchange Hash([3, 5, 4, 6, 7]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, t1.a, null:Int64, null:Decimal, null:Serial, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int32, null:Int64, t2.b, null:Serial, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.b], dist: UpstreamHashShard(t2.b) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, null:Int32, t3.c, null:Decimal, null:Serial, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c], pk: [t3.c], dist: UpstreamHashShard(t3.c) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, null:Int32, null:Int64, null:Decimal, t4._row_id, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, t5.a, null:Int64, t5.b, null:Serial, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c], pk: [t5.a, t5.b], dist: UpstreamHashShard(t5.a, t5.b) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 { columns: [ vnode, a, t1_backfill_finished, t1_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 1 { columns: [ vnode, b, t2_backfill_finished, t2_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 2 { columns: [ vnode, c, t3_backfill_finished, t3_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 3 { columns: [ vnode, _row_id, t4_backfill_finished, t4_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4 { columns: [ vnode, a, b, t5_backfill_finished, t5_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4294967294 { columns: [ a, b, c, t1.a, null:Int64, null:Decimal, null:Serial, $src ], primary key: [ $3 ASC, $5 ASC, $4 ASC, $6 ASC, $7 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ], distribution key: [ 3, 5, 4, 6, 7 ], read pk prefix len hint: 5 } + +- name: test merged union stream key (4 columns, row_id + src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, t1.a(hidden), null:Decimal(hidden), null:Serial(hidden), $src(hidden)], stream_key: [t1.a, null:Decimal, null:Serial, $src], pk_columns: [t1.a, null:Decimal, null:Serial, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([3, 4, 5, 6]) from 1 + ├── StreamExchange Hash([3, 4, 5, 6]) from 2 + ├── StreamExchange Hash([3, 4, 5, 6]) from 3 + ├── StreamExchange Hash([3, 4, 5, 6]) from 4 + └── StreamExchange Hash([3, 4, 5, 6]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, t1.a, null:Decimal, null:Serial, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int32, t2.b, null:Serial, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.b], dist: UpstreamHashShard(t2.b) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, null:Int32, null:Decimal, t3._row_id, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c, t3._row_id], pk: [t3._row_id], dist: UpstreamHashShard(t3._row_id) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, null:Int32, null:Decimal, t4._row_id, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, t5.a, t5.b, null:Serial, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c], pk: [t5.a, t5.b], dist: UpstreamHashShard(t5.a, t5.b) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 { columns: [ vnode, a, t1_backfill_finished, t1_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 1 { columns: [ vnode, b, t2_backfill_finished, t2_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 2 { columns: [ vnode, _row_id, t3_backfill_finished, t3_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 3 { columns: [ vnode, _row_id, t4_backfill_finished, t4_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4 { columns: [ vnode, a, b, t5_backfill_finished, t5_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4294967294 { columns: [ a, b, c, t1.a, null:Decimal, null:Serial, $src ], primary key: [ $3 ASC, $4 ASC, $5 ASC, $6 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6 ], distribution key: [ 3, 4, 5, 6 ], read pk prefix len hint: 4 } + +- name: test merged union stream key (3 columns, src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (b)); + create table t4 (a int, b numeric, c bigint, primary key (b, a)); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, $src(hidden)], stream_key: [a, b, $src], pk_columns: [a, b, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([0, 1, 3]) from 1 + ├── StreamExchange Hash([0, 1, 3]) from 2 + ├── StreamExchange Hash([0, 1, 3]) from 3 + ├── StreamExchange Hash([0, 1, 3]) from 4 + └── StreamExchange Hash([0, 1, 3]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.b], dist: UpstreamHashShard(t2.b) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c], pk: [t3.b], dist: UpstreamHashShard(t3.b) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c], pk: [t4.b, t4.a], dist: UpstreamHashShard(t4.a, t4.b) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c], pk: [t5.a, t5.b], dist: UpstreamHashShard(t5.a, t5.b) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 + ├── columns: [ vnode, a, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 1 + ├── columns: [ vnode, b, t2_backfill_finished, t2_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 2 + ├── columns: [ vnode, b, t3_backfill_finished, t3_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 3 + ├── columns: [ vnode, b, a, t4_backfill_finished, t4_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3, 4 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4 + ├── columns: [ vnode, a, b, t5_backfill_finished, t5_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3, 4 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4294967294 + ├── columns: [ a, b, c, $src ] + ├── primary key: [ $0 ASC, $1 ASC, $3 ASC ] + ├── value indices: [ 0, 1, 2, 3 ] + ├── distribution key: [ 0, 1, 3 ] + └── read pk prefix len hint: 3 + diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index d1916a33192c6..e4ef42b121528 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -140,13 +140,13 @@ create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; select * from t1 Union all select * from t2; stream_plan: |- - StreamMaterialize { columns: [ts, v1, v2, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], stream_key: [t1._row_id, null:Serial, 0:Int32], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: NoCheck, watermark_columns: [ts] } + StreamMaterialize { columns: [ts, v1, v2, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck, watermark_columns: [ts] } └─StreamUnion { all: true, output_watermarks: [t1.ts] } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Serial, 0:Int32], output_watermarks: [t1.ts] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, 0:Int32], output_watermarks: [t1.ts] } │ └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Serial, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } └─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } - name: union sql: | @@ -159,11 +159,11 @@ └─StreamExchange { dist: HashShard(t1.ts, t1.v1, t1.v2) } └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2], output_watermarks: [t1.ts] } └─StreamUnion { all: true, output_watermarks: [t1.ts] } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Serial, 0:Int32], output_watermarks: [t1.ts] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, 0:Int32], output_watermarks: [t1.ts] } │ └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Serial, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } └─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } - name: tumble sql: | diff --git a/src/frontend/src/optimizer/plan_node/generic/union.rs b/src/frontend/src/optimizer/plan_node/generic/union.rs index bc736eed4e153..3e6a5b9b9bab6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/union.rs @@ -33,7 +33,13 @@ pub struct Union { impl GenericPlanNode for Union { fn schema(&self) -> Schema { - self.inputs[0].schema().clone() + let mut schema = self.inputs[0].schema().clone(); + if let Some(source_col) = self.source_col { + schema.fields[source_col].name = "$src".to_string(); + schema + } else { + schema + } } fn stream_key(&self) -> Option> { diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 10371fda3c2b0..51e4e620cf4ca 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::max; +use std::collections::BTreeMap; + use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_common::error::Result; @@ -174,7 +177,7 @@ impl ToStream for LogicalUnion { rewrites.push(input.logical_rewrite_for_stream(ctx)?); } - let original_schema_contain_all_input_pks = + let original_schema_contain_all_input_stream_keys = rewrites.iter().all(|(new_input, col_index_mapping)| { let original_schema_new_pos = (0..original_schema_len) .map(|x| col_index_mapping.map(x)) @@ -185,7 +188,7 @@ impl ToStream for LogicalUnion { .all(|x| original_schema_new_pos.contains(x)) }); - if original_schema_contain_all_input_pks { + if original_schema_contain_all_input_stream_keys { // Add one more column at the end of the original schema to identify the record came // from which input. [original_schema + source_col] let new_inputs = rewrites @@ -223,29 +226,45 @@ impl ToStream for LogicalUnion { Ok((new_union.into(), out_col_change)) } else { // In order to ensure all inputs have the same schema for new union, we construct new - // schema like that: [original_schema + input1_pk + input2_pk + ... + - // source_col] - let input_pk_types = rewrites - .iter() - .flat_map(|(new_input, _)| { - new_input - .expect_stream_key() - .iter() - .map(|x| new_input.schema().fields[*x].data_type()) - }) - .collect_vec(); - let input_pk_nulls = input_pk_types + // schema like that: [original_schema + merged_stream_key + source_col] + // where merged_stream_key is merged by the types of each input stream key. + // If all inputs have the same stream key column types, we have a small merged_stream_key. Otherwise, we will have a large merged_stream_key. + + let (merged_stream_key_types, types_offset) = { + let mut max_types_counter = BTreeMap::default(); + for (new_input, _) in &rewrites { + let mut types_counter = BTreeMap::default(); + for x in new_input.expect_stream_key() { + types_counter + .entry(new_input.schema().fields[*x].data_type()) + .and_modify(|x| *x += 1) + .or_insert(1); + } + for (key, val) in types_counter { + max_types_counter + .entry(key) + .and_modify(|x| *x = max(*x, val)) + .or_insert(val); + } + } + + let mut merged_stream_key_types = vec![]; + let mut types_offset = BTreeMap::default(); + let mut offset = 0; + for (key, val) in max_types_counter { + let _ = types_offset.insert(key.clone(), offset); + offset += val; + merged_stream_key_types.extend(std::iter::repeat(key.clone()).take(val)); + } + + (merged_stream_key_types, types_offset) + }; + + let input_stream_key_nulls = merged_stream_key_types .iter() .map(|t| ExprImpl::Literal(Literal::new(None, t.clone()).into())) .collect_vec(); - let input_pk_lens = rewrites - .iter() - .map(|(new_input, _)| new_input.expect_stream_key().len()) - .collect_vec(); - let mut input_pk_offsets = vec![0]; - for (i, len) in input_pk_lens.into_iter().enumerate() { - input_pk_offsets.push(input_pk_offsets[i] + len) - } + let new_inputs = rewrites .into_iter() .enumerate() @@ -262,18 +281,22 @@ impl ToStream for LogicalUnion { ) }) .collect_vec(); - // input1_pk + input2_pk + ... - let mut input_pks = input_pk_nulls.clone(); - for (j, pk_idx) in new_input.expect_stream_key().iter().enumerate() { - input_pks[input_pk_offsets[i] + j] = ExprImpl::InputRef( - InputRef::new( - *pk_idx, - new_input.schema().fields[*pk_idx].data_type.clone(), - ) - .into(), - ); + // merged_stream_key + let mut input_stream_keys = input_stream_key_nulls.clone(); + let mut types_counter = BTreeMap::default(); + for stream_key_idx in new_input.expect_stream_key() { + let data_type = + new_input.schema().fields[*stream_key_idx].data_type.clone(); + let count = *types_counter + .entry(data_type.clone()) + .and_modify(|x| *x += 1) + .or_insert(1); + let type_start_offset = *types_offset.get(&data_type).unwrap(); + + input_stream_keys[type_start_offset + count - 1] = + ExprImpl::InputRef(InputRef::new(*stream_key_idx, data_type).into()); } - exprs.extend(input_pks); + exprs.extend(input_stream_keys); // source_col exprs.push(ExprImpl::Literal( Literal::new(Some((i as i32).to_scalar_value()), DataType::Int32).into(), @@ -285,7 +308,7 @@ impl ToStream for LogicalUnion { let new_union = LogicalUnion::new_with_source_col( self.all(), new_inputs, - Some(original_schema_len + input_pk_types.len()), + Some(original_schema_len + merged_stream_key_types.len()), ); // We have already used project to map rewrite input to the origin schema, so we can use // identity with the new schema len.