Skip to content

Commit

Permalink
feat(frontend): support two phase vnode based simple agg with approx_…
Browse files Browse the repository at this point in the history
…percentile (#18007)
  • Loading branch information
kwannoel authored Aug 14, 2024
1 parent 3cae1c3 commit f09f195
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(-1000, 1000) t(a);

statement ok
flush;

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
min(p_col),
percentile_cont(0.5) within group (order by p_col) as p50,
count(*),
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-980 -1000 0 2001 980

statement ok
create materialized view m1 as
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
min(p_col),
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
count(*),
approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;

query I
select * from m1;
----
-982.5779489474152 -1000 0 2001 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -1000 0 2001 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select * from m1;
----
-963.1209598593477 -1000 0.00009999833511933609 3002 963.1209598593477

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
min(p_col),
percentile_cont(0.5) within group (order by p_col) as p50,
count(*),
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-969.99 -1000 0.0001 3002 969.9899999999998

statement ok
drop materialized view m1;

statement ok
drop table t;
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,6 @@ select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,13 @@
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.2, 0.01) WITHIN GROUP (order by v1 desc) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
30 changes: 30 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2040,3 +2040,33 @@
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
logical_plan: |-
LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] }
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, m2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] }
├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), max(max(t.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
148 changes: 100 additions & 48 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,38 +76,17 @@ impl LogicalAgg {
let mut core = self.core.clone();

// ====== Handle approx percentile aggs
let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg();

let AggInfo {
calls: non_approx_percentile_agg_calls,
col_mapping: non_approx_percentile_col_mapping,
} = normal;
let AggInfo {
calls: approx_percentile_agg_calls,
col_mapping: approx_percentile_col_mapping,
} = approx;

let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
&& !approx_percentile_agg_calls.is_empty())
|| approx_percentile_agg_calls.len() >= 2;
core.input = if needs_row_merge {
// If there's row merge, we need to share the input.
StreamShare::new_from_input(stream_input.clone()).into()
} else {
stream_input
};
core.agg_calls = non_approx_percentile_agg_calls;
let (non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) =
self.prepare_approx_percentile(&mut core, stream_input.clone())?;

let approx_percentile =
self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?;

// ====== Handle normal aggs
if core.agg_calls.is_empty() {
if let Some(approx_percentile) = approx_percentile {
return Ok(approx_percentile);
};
bail!("expected at least one agg call");
}

// ====== Handle normal aggs
let total_agg_calls = core
.agg_calls
.iter()
Expand All @@ -123,21 +102,12 @@ impl LogicalAgg {
new_stream_simple_agg(Agg::new(total_agg_calls, IndexSet::empty(), exchange));

// ====== Merge approx percentile and normal aggs
if let Some(approx_percentile) = approx_percentile {
if needs_row_merge {
let row_merge = StreamRowMerge::new(
approx_percentile,
global_agg.into(),
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)?;
Ok(row_merge.into())
} else {
Ok(approx_percentile)
}
} else {
Ok(global_agg.into())
}
Self::add_row_merge_if_needed(
approx_percentile,
global_agg.into(),
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)
}

/// Generate plan for stateless/stateful 2-phase streaming agg.
Expand All @@ -148,10 +118,21 @@ impl LogicalAgg {
stream_input: PlanRef,
dist_key: &[usize],
) -> Result<PlanRef> {
let input_col_num = stream_input.schema().len();
let mut core = self.core.clone();

let (non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) =
self.prepare_approx_percentile(&mut core, stream_input.clone())?;

if core.agg_calls.is_empty() {
if let Some(approx_percentile) = approx_percentile {
return Ok(approx_percentile);
};
bail!("expected at least one agg call");
}

// Generate vnode via project
// TODO(kwannoel): We should apply Project optimization rules here.
let input_col_num = stream_input.schema().len(); // get schema len before moving `stream_input`.
let project = StreamProject::new(generic::Project::with_vnode_col(stream_input, dist_key));
let vnode_col_idx = project.base.schema().len() - 1;

Expand All @@ -160,7 +141,7 @@ impl LogicalAgg {
local_group_key.insert(vnode_col_idx);
let n_local_group_key = local_group_key.len();
let local_agg = new_stream_hash_agg(
Agg::new(self.agg_calls().to_vec(), local_group_key, project.into()),
Agg::new(core.agg_calls.to_vec(), local_group_key, project.into()),
Some(vnode_col_idx),
);
// Global group key excludes vnode.
Expand All @@ -173,11 +154,11 @@ impl LogicalAgg {
.expect("some input group key could not be mapped");

// Generate global agg step
if self.group_key().is_empty() {
let global_agg = if self.group_key().is_empty() {
let exchange =
RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?;
let global_agg = new_stream_simple_agg(Agg::new(
self.agg_calls()
core.agg_calls
.iter()
.enumerate()
.map(|(partial_output_idx, agg_call)| {
Expand All @@ -187,15 +168,15 @@ impl LogicalAgg {
global_group_key.into_iter().collect(),
exchange,
));
Ok(global_agg.into())
global_agg.into()
} else {
let exchange = RequiredDist::shard_by_key(input_col_num, &global_group_key)
.enforce_if_not_satisfies(local_agg.into(), &Order::any())?;
// Local phase should have reordered the group keys into their required order.
// we can just follow it.
let global_agg = new_stream_hash_agg(
Agg::new(
self.agg_calls()
core.agg_calls
.iter()
.enumerate()
.map(|(partial_output_idx, agg_call)| {
Expand All @@ -208,8 +189,14 @@ impl LogicalAgg {
),
None,
);
Ok(global_agg.into())
}
global_agg.into()
};
Self::add_row_merge_if_needed(
approx_percentile,
global_agg,
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)
}

fn gen_single_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
Expand Down Expand Up @@ -304,6 +291,71 @@ impl LogicalAgg {
}
}

/// Prepares metadata and the `approx_percentile` plan, if there's one present.
/// It may modify `core.agg_calls` to separate normal agg and approx percentile agg,
/// and `core.input` to share the input via `StreamShare`,
/// to both approx percentile agg and normal agg.
fn prepare_approx_percentile(
&self,
core: &mut Agg<PlanRef>,
stream_input: PlanRef,
) -> Result<(ColIndexMapping, ColIndexMapping, Option<PlanRef>)> {
let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg();

let AggInfo {
calls: non_approx_percentile_agg_calls,
col_mapping: non_approx_percentile_col_mapping,
} = normal;
let AggInfo {
calls: approx_percentile_agg_calls,
col_mapping: approx_percentile_col_mapping,
} = approx;
if !self.group_key().is_empty() && !approx_percentile_agg_calls.is_empty() {
bail_not_implemented!("two-phase approx percentile agg with group key, please use single phase agg for approx_percentile with group key");
}

// Either we have approx percentile aggs and non_approx percentile aggs,
// or we have at least 2 approx percentile aggs.
let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
&& !approx_percentile_agg_calls.is_empty())
|| approx_percentile_agg_calls.len() >= 2;
core.input = if needs_row_merge {
// If there's row merge, we need to share the input.
StreamShare::new_from_input(stream_input.clone()).into()
} else {
stream_input
};
core.agg_calls = non_approx_percentile_agg_calls;

let approx_percentile =
self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?;
Ok((
non_approx_percentile_col_mapping,
approx_percentile_col_mapping,
approx_percentile,
))
}

/// Add `RowMerge` if needed
fn add_row_merge_if_needed(
approx_percentile: Option<PlanRef>,
global_agg: PlanRef,
approx_percentile_col_mapping: ColIndexMapping,
non_approx_percentile_col_mapping: ColIndexMapping,
) -> Result<PlanRef> {
if let Some(approx_percentile) = approx_percentile {
let row_merge = StreamRowMerge::new(
approx_percentile,
global_agg,
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)?;
Ok(row_merge.into())
} else {
Ok(global_agg)
}
}

fn separate_normal_and_special_agg(&self) -> SeparatedAggInfo {
let estimated_len = self.agg_calls().len() - 1;
let mut approx_percentile_agg_calls = Vec::with_capacity(estimated_len);
Expand Down

0 comments on commit f09f195

Please sign in to comment.