Skip to content

Commit

Permalink
feat(stream): support row merge (a.k.a keyed merge) (#17930)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Aug 8, 2024
1 parent fbaed06 commit f5f5701
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 53 deletions.
93 changes: 93 additions & 0 deletions e2e_test/streaming/aggregate/two_phase_approx_percentile_merge.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(-1000, 1000) t(a);

statement ok
flush;

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
percentile_cont(0.1) within group (order by p_col) as p10,
percentile_cont(0.5) within group (order by p_col) as p50,
percentile_cont(0.9) within group (order by p_col) as p90,
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-980 -800 0 800 980

statement ok
create materialized view m1 as
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
approx_percentile(0.1, 0.01) within group (order by p_col) as p10,
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
approx_percentile(0.9, 0.01) within group (order by p_col) as p90,
approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select * from m1;
----
-963.1209598593477 -699.3618972397041 0.00009999833511933609 699.3618972397041 963.1209598593477

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
percentile_cont(0.1) within group (order by p_col) as p10,
percentile_cont(0.5) within group (order by p_col) as p50,
percentile_cont(0.9) within group (order by p_col) as p90,
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-969.99 -699.9 0.0001 699.9000000000001 969.9899999999998

statement ok
drop materialized view m1;

statement ok
drop table t;
6 changes: 6 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,11 @@ message GlobalApproxPercentileNode {
catalog.Table count_state_table = 4;
}

message RowMergeNode {
catalog.ColIndexMapping lhs_mapping = 1;
catalog.ColIndexMapping rhs_mapping = 2;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -847,6 +852,7 @@ message StreamNode {
ChangeLogNode changelog = 143;
LocalApproxPercentileNode local_approx_percentile = 144;
GlobalApproxPercentileNode global_approx_percentile = 145;
RowMergeNode row_merge = 146;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
35 changes: 14 additions & 21 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,7 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] }
└─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] }
├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand All @@ -1931,7 +1931,7 @@
stream_plan: |-
StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] }
├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand Down Expand Up @@ -1972,23 +1972,16 @@
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
├─StreamKeyedMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
│ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [count] }
└─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [] }
└─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
Expand All @@ -2003,8 +1996,8 @@
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, s2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamKeyedMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand Down Expand Up @@ -2034,7 +2027,7 @@
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] }
├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 }
Expand Down
40 changes: 23 additions & 17 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::{bail_not_implemented, not_implemented};
use risingwave_common::{bail, bail_not_implemented, not_implemented};
use risingwave_expr::aggregate::{agg_kinds, AggKind, PbAggKind};

use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder};
Expand All @@ -34,8 +34,8 @@ use crate::expr::{
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::stream_global_approx_percentile::StreamGlobalApproxPercentile;
use crate::optimizer::plan_node::stream_keyed_merge::StreamKeyedMerge;
use crate::optimizer::plan_node::stream_local_approx_percentile::StreamLocalApproxPercentile;
use crate::optimizer::plan_node::stream_row_merge::StreamRowMerge;
use crate::optimizer::plan_node::{
gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalDedup, LogicalProject,
PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down Expand Up @@ -87,11 +87,11 @@ impl LogicalAgg {
col_mapping: approx_percentile_col_mapping,
} = approx;

let needs_keyed_merge = (!non_approx_percentile_agg_calls.is_empty()
let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
&& !approx_percentile_agg_calls.is_empty())
|| approx_percentile_agg_calls.len() >= 2;
core.input = if needs_keyed_merge {
// If there's keyed merge, we need to share the input.
core.input = if needs_row_merge {
// If there's row merge, we need to share the input.
StreamShare::new_from_input(stream_input.clone()).into()
} else {
stream_input
Expand All @@ -102,6 +102,12 @@ impl LogicalAgg {
self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?;

// ====== Handle normal aggs
if core.agg_calls.is_empty() {
if let Some(approx_percentile) = approx_percentile {
return Ok(approx_percentile);
};
bail!("expected at least one agg call");
}
let total_agg_calls = core
.agg_calls
.iter()
Expand All @@ -118,14 +124,14 @@ impl LogicalAgg {

// ====== Merge approx percentile and normal aggs
if let Some(approx_percentile) = approx_percentile {
if needs_keyed_merge {
let keyed_merge = StreamKeyedMerge::new(
if needs_row_merge {
let row_merge = StreamRowMerge::new(
approx_percentile,
global_agg.into(),
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)?;
Ok(keyed_merge.into())
Ok(row_merge.into())
} else {
Ok(approx_percentile)
}
Expand Down Expand Up @@ -345,15 +351,15 @@ impl LogicalAgg {
}

/// If only 1 approx percentile, just return it.
/// Otherwise build a tree of approx percentile with `KeyedMerge`.
/// Otherwise build a tree of approx percentile with `MergeProject`.
/// e.g.
/// ApproxPercentile(col1, 0.5) as x,
/// ApproxPercentile(col2, 0.5) as y,
/// ApproxPercentile(col3, 0.5) as z
/// will be built as
/// `KeyedMerge`
/// `MergeProject`
/// / \
/// `KeyedMerge` z
/// `MergeProject` z
/// / \
/// x y
Expand All @@ -374,14 +380,14 @@ impl LogicalAgg {
let mut acc = iter.next().unwrap();
for (current_size, plan) in iter.enumerate().map(|(i, p)| (i + 1, p)) {
let new_size = current_size + 1;
let keyed_merge = StreamKeyedMerge::new(
let row_merge = StreamRowMerge::new(
acc,
plan,
ColIndexMapping::identity_or_none(current_size, new_size),
ColIndexMapping::new(vec![Some(current_size)], new_size),
)
.expect("failed to build keyed merge");
acc = keyed_merge.into();
.expect("failed to build row merge");
acc = row_merge.into();
}
Ok(Some(acc))
}
Expand Down Expand Up @@ -1312,17 +1318,17 @@ impl ToStream for LogicalAgg {
.into());
}
(plan.clone(), 1)
} else if let Some(stream_keyed_merge) = plan.as_stream_keyed_merge() {
} else if let Some(stream_row_merge) = plan.as_stream_row_merge() {
if eowc {
return Err(ErrorCode::InvalidInputSyntax(
"`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`"
.to_string(),
)
.into());
}
(plan.clone(), stream_keyed_merge.base.schema().len())
(plan.clone(), stream_row_merge.base.schema().len())
} else {
panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamKeyedMerge");
panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamRowMerge");
};

if self.agg_calls().len() == n_final_agg_calls {
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,14 @@ mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_keyed_merge;
mod stream_local_approx_percentile;
mod stream_materialize;
mod stream_now;
mod stream_over_window;
mod stream_project;
mod stream_project_set;
mod stream_row_id_gen;
mod stream_row_merge;
mod stream_simple_agg;
mod stream_sink;
mod stream_sort;
Expand Down Expand Up @@ -1010,14 +1010,14 @@ pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
pub use stream_keyed_merge::StreamKeyedMerge;
pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
pub use stream_over_window::StreamOverWindow;
pub use stream_project::StreamProject;
pub use stream_project_set::StreamProjectSet;
pub use stream_row_id_gen::StreamRowIdGen;
pub use stream_row_merge::StreamRowMerge;
pub use stream_share::StreamShare;
pub use stream_simple_agg::StreamSimpleAgg;
pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
Expand Down Expand Up @@ -1158,7 +1158,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, KeyedMerge }
, { Stream, RowMerge }
}
};
}
Expand Down Expand Up @@ -1287,7 +1287,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, KeyedMerge }
, { Stream, RowMerge }
}
};
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl StreamExchange {
} else {
MonotonicityMap::new()
};
assert!(!input.schema().is_empty());
let base = PlanBase::new_stream(
input.ctx(),
input.schema().clone(),
Expand Down
Loading

0 comments on commit f5f5701

Please sign in to comment.