Skip to content

Commit

Permalink
feat(agg): support jsonb_agg and jsonb_object_agg in streaming mo…
Browse files Browse the repository at this point in the history
…de (#12836)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 16, 2023
1 parent 58e6f9d commit f8de4a9
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 9 deletions.
46 changes: 46 additions & 0 deletions e2e_test/streaming/aggregate/jsonb_agg.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t(v1 boolean, v2 int, v3 varchar, v4 jsonb);

statement ok
create materialized view mv_tmp as
select jsonb_agg(v1) as j1 from t;

statement ok
drop materialized view mv_tmp;

statement ok
create materialized view mv1 as
select
jsonb_agg(v1 order by v2) as j1,
jsonb_agg(v2 order by v2) as j2,
jsonb_object_agg(v3, v4) as j3
from t;

statement ok
insert into t values
(null, 2, 'bbb', null),
(false, 1, 'ccc', 'null');

query TTT
select * from mv1;
----
[false, null] [1, 2] {"bbb": null, "ccc": null}

statement ok
insert into t values
(true, 0, 'bbb', '999'),
(true, 8, 'ddd', '{"foo": "bar"}');

query TTT
select * from mv1;
----
[true, false, null, true] [0, 1, 2, 8] {"bbb": 999, "ccc": null, "ddd": {"foo": "bar"}}

statement ok
drop materialized view mv1;

statement ok
drop table t;
6 changes: 1 addition & 5 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,7 @@ pub mod agg_kinds {
#[macro_export]
macro_rules! unimplemented_in_stream {
() => {
AggKind::JsonbAgg
| AggKind::JsonbObjectAgg
| AggKind::PercentileCont
| AggKind::PercentileDisc
| AggKind::Mode
AggKind::PercentileCont | AggKind::PercentileDisc | AggKind::Mode
};
}
pub use unimplemented_in_stream;
Expand Down
18 changes: 15 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
| AggKind::FirstValue
| AggKind::LastValue
| AggKind::StringAgg
| AggKind::ArrayAgg => {
| AggKind::ArrayAgg
| AggKind::JsonbAgg
| AggKind::JsonbObjectAgg => {
// columns with order requirement in state table
let sort_keys = {
match agg_call.agg_kind {
Expand All @@ -425,7 +427,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
AggKind::FirstValue
| AggKind::LastValue
| AggKind::StringAgg
| AggKind::ArrayAgg => {
| AggKind::ArrayAgg
| AggKind::JsonbAgg => {
if agg_call.order_by.is_empty() {
me.ctx().warn_to_user(format!(
"{} without ORDER BY may produce non-deterministic result",
Expand All @@ -447,6 +450,11 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
})
.collect()
}
AggKind::JsonbObjectAgg => agg_call
.order_by
.iter()
.map(|o| (o.order_type, o.column_index))
.collect(),
_ => unreachable!(),
}
};
Expand All @@ -455,7 +463,11 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
AggKind::FirstValue
| AggKind::LastValue
| AggKind::StringAgg
| AggKind::ArrayAgg => agg_call.inputs.iter().map(|i| i.index).collect(),
| AggKind::ArrayAgg
| AggKind::JsonbAgg
| AggKind::JsonbObjectAgg => {
agg_call.inputs.iter().map(|i| i.index).collect()
}
_ => vec![],
};
let state = gen_materialized_input_state(sort_keys, include_keys);
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ impl MaterializedInputState {
agg_call.args.arg_types(),
))
}
AggKind::StringAgg | AggKind::ArrayAgg => Box::new(GenericAggStateCache::new(
AggKind::StringAgg
| AggKind::ArrayAgg
| AggKind::JsonbAgg
| AggKind::JsonbObjectAgg => Box::new(GenericAggStateCache::new(
OrderedStateCache::new(),
agg_call.args.arg_types(),
)),
Expand Down

0 comments on commit f8de4a9

Please sign in to comment.