From f8de4a96364bbf81dfb10ad38e5e283c3e6d85ca Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Sun, 15 Oct 2023 23:56:05 -0500 Subject: [PATCH] feat(agg): support `jsonb_agg` and `jsonb_object_agg` in streaming mode (#12836) Signed-off-by: Richard Chien --- e2e_test/streaming/aggregate/jsonb_agg.slt | 46 +++++++++++++++++++ src/expr/core/src/aggregate/def.rs | 6 +-- .../src/optimizer/plan_node/generic/agg.rs | 18 ++++++-- src/stream/src/executor/aggregation/minput.rs | 5 +- 4 files changed, 66 insertions(+), 9 deletions(-) create mode 100644 e2e_test/streaming/aggregate/jsonb_agg.slt diff --git a/e2e_test/streaming/aggregate/jsonb_agg.slt b/e2e_test/streaming/aggregate/jsonb_agg.slt new file mode 100644 index 0000000000000..18cb80cc69085 --- /dev/null +++ b/e2e_test/streaming/aggregate/jsonb_agg.slt @@ -0,0 +1,46 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t(v1 boolean, v2 int, v3 varchar, v4 jsonb); + +statement ok +create materialized view mv_tmp as +select jsonb_agg(v1) as j1 from t; + +statement ok +drop materialized view mv_tmp; + +statement ok +create materialized view mv1 as +select + jsonb_agg(v1 order by v2) as j1, + jsonb_agg(v2 order by v2) as j2, + jsonb_object_agg(v3, v4) as j3 +from t; + +statement ok +insert into t values + (null, 2, 'bbb', null), + (false, 1, 'ccc', 'null'); + +query TTT +select * from mv1; +---- +[false, null] [1, 2] {"bbb": null, "ccc": null} + +statement ok +insert into t values + (true, 0, 'bbb', '999'), + (true, 8, 'ddd', '{"foo": "bar"}'); + +query TTT +select * from mv1; +---- +[true, false, null, true] [0, 1, 2, 8] {"bbb": 999, "ccc": null, "ddd": {"foo": "bar"}} + +statement ok +drop materialized view mv1; + +statement ok +drop table t; diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs index 39d4c158c10d7..f71bfd454a415 100644 --- a/src/expr/core/src/aggregate/def.rs +++ b/src/expr/core/src/aggregate/def.rs @@ -308,11 +308,7 @@ pub mod agg_kinds { #[macro_export] macro_rules! unimplemented_in_stream { () => { - AggKind::JsonbAgg - | AggKind::JsonbObjectAgg - | AggKind::PercentileCont - | AggKind::PercentileDisc - | AggKind::Mode + AggKind::PercentileCont | AggKind::PercentileDisc | AggKind::Mode }; } pub use unimplemented_in_stream; diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 107eec5e51b01..2fb251ca89aa6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -412,7 +412,9 @@ impl Agg { | AggKind::FirstValue | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg => { + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg => { // columns with order requirement in state table let sort_keys = { match agg_call.agg_kind { @@ -425,7 +427,8 @@ impl Agg { AggKind::FirstValue | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg => { + | AggKind::ArrayAgg + | AggKind::JsonbAgg => { if agg_call.order_by.is_empty() { me.ctx().warn_to_user(format!( "{} without ORDER BY may produce non-deterministic result", @@ -447,6 +450,11 @@ impl Agg { }) .collect() } + AggKind::JsonbObjectAgg => agg_call + .order_by + .iter() + .map(|o| (o.order_type, o.column_index)) + .collect(), _ => unreachable!(), } }; @@ -455,7 +463,11 @@ impl Agg { AggKind::FirstValue | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg => agg_call.inputs.iter().map(|i| i.index).collect(), + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg => { + agg_call.inputs.iter().map(|i| i.index).collect() + } _ => vec![], }; let state = gen_materialized_input_state(sort_keys, include_keys); diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 78c7d484385e8..1329f08eb6d99 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -139,7 +139,10 @@ impl MaterializedInputState { agg_call.args.arg_types(), )) } - AggKind::StringAgg | AggKind::ArrayAgg => Box::new(GenericAggStateCache::new( + AggKind::StringAgg + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg => Box::new(GenericAggStateCache::new( OrderedStateCache::new(), agg_call.args.arg_types(), )),