Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): support row merge (a.k.a keyed merge) #17930

Merged
merged 33 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1be08ab
support local stateless approx percentile
kwannoel Jul 30, 2024
3d8f85b
handle chunk
kwannoel Aug 1, 2024
1d2626d
handle barrier
kwannoel Aug 1, 2024
c8c900f
add local approx percentile proto
kwannoel Aug 1, 2024
1170acc
from_proto for global
kwannoel Aug 1, 2024
19060d6
convert plans to proto
kwannoel Aug 1, 2024
48b3b46
fmt
kwannoel Aug 1, 2024
0bd820d
defer keyed merge
kwannoel Aug 1, 2024
356763b
interim commit: adding tests but failing
kwannoel Aug 1, 2024
a437469
revert some debug in global
kwannoel Aug 2, 2024
768a70a
minor
kwannoel Aug 2, 2024
5b0af78
add more test, fix bugs in calculating percentile
kwannoel Aug 2, 2024
c362ed2
support negative, but needs some fixes still, specifically we need to…
kwannoel Aug 2, 2024
b2d92ba
properly handle neg
kwannoel Aug 2, 2024
5178950
revert debug stmts
kwannoel Aug 2, 2024
c134477
remove some fixme
kwannoel Aug 2, 2024
dbe8018
fmt
kwannoel Aug 2, 2024
a4d68b3
more fmt
kwannoel Aug 2, 2024
6c93832
drop table and mv
kwannoel Aug 2, 2024
84c656f
fix comments
kwannoel Aug 6, 2024
66ecdea
ignore watermarks
kwannoel Aug 7, 2024
8b2619c
rename
kwannoel Aug 5, 2024
863316c
implement merge project
kwannoel Aug 5, 2024
dc3ed93
finish impl
kwannoel Aug 5, 2024
e3d23bb
add test + fix bugs
kwannoel Aug 5, 2024
b4f4cfe
fmt
kwannoel Aug 6, 2024
264ef05
rename MergeProject to RowMerge
kwannoel Aug 6, 2024
9df0692
dapt
kwannoel Aug 6, 2024
b7edcba
fix
kwannoel Aug 6, 2024
51cdb17
fix
kwannoel Aug 6, 2024
677edab
handle multiple inputs in one epoch
kwannoel Aug 8, 2024
a4cd962
Merge branch 'main' into kwannoel/keyed-merge
kwannoel Aug 8, 2024
ebf0545
address review comments
kwannoel Aug 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions e2e_test/streaming/aggregate/two_phase_approx_percentile_merge.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(-1000, 1000) t(a);

statement ok
flush;

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
percentile_cont(0.1) within group (order by p_col) as p10,
percentile_cont(0.5) within group (order by p_col) as p50,
percentile_cont(0.9) within group (order by p_col) as p90,
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-980 -800 0 800 980

statement ok
create materialized view m1 as
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
approx_percentile(0.1, 0.01) within group (order by p_col) as p10,
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
approx_percentile(0.9, 0.01) within group (order by p_col) as p90,
approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select * from m1;
----
-963.1209598593477 -699.3618972397041 0.00009999833511933609 699.3618972397041 963.1209598593477

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
percentile_cont(0.1) within group (order by p_col) as p10,
percentile_cont(0.5) within group (order by p_col) as p50,
percentile_cont(0.9) within group (order by p_col) as p90,
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-969.99 -699.9 0.0001 699.9000000000001 969.9899999999998

statement ok
drop materialized view m1;

statement ok
drop table t;
6 changes: 6 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,11 @@ message GlobalApproxPercentileNode {
catalog.Table count_state_table = 4;
}

message RowMergeNode {
catalog.ColIndexMapping lhs_mapping = 1;
catalog.ColIndexMapping rhs_mapping = 2;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -847,6 +852,7 @@ message StreamNode {
ChangeLogNode changelog = 143;
LocalApproxPercentileNode local_approx_percentile = 144;
GlobalApproxPercentileNode global_approx_percentile = 145;
RowMergeNode row_merge = 146;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
35 changes: 14 additions & 21 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,7 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] }
└─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] }
├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand All @@ -1931,7 +1931,7 @@
stream_plan: |-
StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] }
├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand Down Expand Up @@ -1972,23 +1972,16 @@
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
├─StreamKeyedMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
│ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [count] }
└─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [] }
└─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
Expand All @@ -2003,8 +1996,8 @@
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, s2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamKeyedMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand Down Expand Up @@ -2034,7 +2027,7 @@
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] }
├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 }
Expand Down
40 changes: 23 additions & 17 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::{bail_not_implemented, not_implemented};
use risingwave_common::{bail, bail_not_implemented, not_implemented};
use risingwave_expr::aggregate::{agg_kinds, AggKind, PbAggKind};

use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder};
Expand All @@ -34,8 +34,8 @@ use crate::expr::{
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::stream_global_approx_percentile::StreamGlobalApproxPercentile;
use crate::optimizer::plan_node::stream_keyed_merge::StreamKeyedMerge;
use crate::optimizer::plan_node::stream_local_approx_percentile::StreamLocalApproxPercentile;
use crate::optimizer::plan_node::stream_row_merge::StreamRowMerge;
use crate::optimizer::plan_node::{
gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalDedup, LogicalProject,
PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down Expand Up @@ -87,11 +87,11 @@ impl LogicalAgg {
col_mapping: approx_percentile_col_mapping,
} = approx;

let needs_keyed_merge = (!non_approx_percentile_agg_calls.is_empty()
let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
&& !approx_percentile_agg_calls.is_empty())
|| approx_percentile_agg_calls.len() >= 2;
core.input = if needs_keyed_merge {
// If there's keyed merge, we need to share the input.
core.input = if needs_row_merge {
// If there's row merge, we need to share the input.
StreamShare::new_from_input(stream_input.clone()).into()
} else {
stream_input
Expand All @@ -102,6 +102,12 @@ impl LogicalAgg {
self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?;

// ====== Handle normal aggs
if core.agg_calls.is_empty() {
if let Some(approx_percentile) = approx_percentile {
return Ok(approx_percentile);
};
bail!("expected at least one agg call");
}
let total_agg_calls = core
.agg_calls
.iter()
Expand All @@ -118,14 +124,14 @@ impl LogicalAgg {

// ====== Merge approx percentile and normal aggs
if let Some(approx_percentile) = approx_percentile {
if needs_keyed_merge {
let keyed_merge = StreamKeyedMerge::new(
if needs_row_merge {
let row_merge = StreamRowMerge::new(
approx_percentile,
global_agg.into(),
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)?;
Ok(keyed_merge.into())
Ok(row_merge.into())
} else {
Ok(approx_percentile)
}
Expand Down Expand Up @@ -345,15 +351,15 @@ impl LogicalAgg {
}

/// If only 1 approx percentile, just return it.
/// Otherwise build a tree of approx percentile with `KeyedMerge`.
/// Otherwise build a tree of approx percentile with `MergeProject`.
/// e.g.
/// ApproxPercentile(col1, 0.5) as x,
/// ApproxPercentile(col2, 0.5) as y,
/// ApproxPercentile(col3, 0.5) as z
/// will be built as
/// `KeyedMerge`
/// `MergeProject`
/// / \
/// `KeyedMerge` z
/// `MergeProject` z
/// / \
/// x y

Expand All @@ -374,14 +380,14 @@ impl LogicalAgg {
let mut acc = iter.next().unwrap();
for (current_size, plan) in iter.enumerate().map(|(i, p)| (i + 1, p)) {
let new_size = current_size + 1;
let keyed_merge = StreamKeyedMerge::new(
let row_merge = StreamRowMerge::new(
acc,
plan,
ColIndexMapping::identity_or_none(current_size, new_size),
ColIndexMapping::new(vec![Some(current_size)], new_size),
)
.expect("failed to build keyed merge");
acc = keyed_merge.into();
.expect("failed to build row merge");
acc = row_merge.into();
}
Ok(Some(acc))
}
Expand Down Expand Up @@ -1312,17 +1318,17 @@ impl ToStream for LogicalAgg {
.into());
}
(plan.clone(), 1)
} else if let Some(stream_keyed_merge) = plan.as_stream_keyed_merge() {
} else if let Some(stream_row_merge) = plan.as_stream_row_merge() {
if eowc {
return Err(ErrorCode::InvalidInputSyntax(
"`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`"
.to_string(),
)
.into());
}
(plan.clone(), stream_keyed_merge.base.schema().len())
(plan.clone(), stream_row_merge.base.schema().len())
} else {
panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamKeyedMerge");
panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamRowMerge");
};

if self.agg_calls().len() == n_final_agg_calls {
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,14 @@ mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_keyed_merge;
mod stream_local_approx_percentile;
mod stream_materialize;
mod stream_now;
mod stream_over_window;
mod stream_project;
mod stream_project_set;
mod stream_row_id_gen;
mod stream_row_merge;
mod stream_simple_agg;
mod stream_sink;
mod stream_sort;
Expand Down Expand Up @@ -1010,14 +1010,14 @@ pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
pub use stream_keyed_merge::StreamKeyedMerge;
pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
pub use stream_over_window::StreamOverWindow;
pub use stream_project::StreamProject;
pub use stream_project_set::StreamProjectSet;
pub use stream_row_id_gen::StreamRowIdGen;
pub use stream_row_merge::StreamRowMerge;
pub use stream_share::StreamShare;
pub use stream_simple_agg::StreamSimpleAgg;
pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
Expand Down Expand Up @@ -1158,7 +1158,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, KeyedMerge }
, { Stream, RowMerge }
}
};
}
Expand Down Expand Up @@ -1287,7 +1287,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, KeyedMerge }
, { Stream, RowMerge }
}
};
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl StreamExchange {
} else {
MonotonicityMap::new()
};
assert!(!input.schema().is_empty());
let base = PlanBase::new_stream(
input.ctx(),
input.schema().clone(),
Expand Down
Loading
Loading