Skip to content

Commit

Permalink
fix(stream): fix functional dependencies for row merge (#18072)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Aug 17, 2024
1 parent 31e52d4 commit 83b3647
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ statement ok
create materialized view m1 as
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
sum(p_col),
sum(p_col) as s,
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
count(*),
approx_percentile(0.99, 0.01) within group (order by p_col) as p99
count(*)::double + approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;

query I
select * from m1;
select p01, s, p50, round(p99::numeric, 2) from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152
-982.5779489474152 0 0 2983.58

# Test state encode / decode
onlyif can-use-recover
Expand All @@ -45,7 +44,7 @@ sleep 10s
query I
select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152
-982.5779489474152 0 0 2983.5779489474152

# Test 0<x<1 values
statement ok
Expand All @@ -60,7 +59,7 @@ flush;
query I
select * from m1;
----
-963.1209598593477 0.5501000000000007 0.00009999833511933609 3002 963.1209598593477
-963.1209598593477 0.5501000000000007 0.00009999833511933609 3965.1209598593477

query I
select
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,14 @@
- name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) as s2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.9, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with duplicated approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
Expand Down
79 changes: 56 additions & 23 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1988,34 +1988,67 @@
- name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) as s2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.9, 0.01) WITHIN GROUP (order by v2) as y from t;
logical_plan: |-
LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] }
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, s2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamGlobalApproxPercentile { quantile: 0.9:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.9:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test simple approx_percentile with duplicated approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
logical_plan: |-
LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] }
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
│ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamExchange { dist: Single }
│ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test simple approx_percentile with descending order
sql: |
CREATE TABLE t (v1 int, v2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
Expand All @@ -28,7 +27,7 @@ use crate::optimizer::plan_node::utils::{childless_record, Distill, TableCatalog
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, FunctionalDependencySet};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand All @@ -48,12 +47,13 @@ impl StreamGlobalApproxPercentile {
DataType::Float64,
"approx_percentile",
)]);
let functional_dependency = FunctionalDependencySet::with_key(1, &[]);
let watermark_columns = FixedBitSet::with_capacity(1);
let base = PlanBase::new_stream(
input.ctx(),
schema,
Some(vec![]),
input.functional_dependency().clone(),
functional_dependency,
Distribution::Single,
input.append_only(),
input.emit_on_window_close(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Dis
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
use crate::optimizer::property::FunctionalDependencySet;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand All @@ -50,11 +51,12 @@ impl StreamLocalApproxPercentile {
]);
// FIXME(kwannoel): How does watermark work with FixedBitSet
let watermark_columns = FixedBitSet::with_capacity(3);
let functional_dependency = FunctionalDependencySet::with_key(3, &[]);
let base = PlanBase::new_stream(
input.ctx(),
schema,
input.stream_key().map(|k| k.to_vec()),
input.functional_dependency().clone(),
functional_dependency,
input.distribution().clone(),
input.append_only(),
input.emit_on_window_close(),
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_row_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{
ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode,
};
use crate::optimizer::property::FunctionalDependencySet;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand Down Expand Up @@ -57,6 +58,8 @@ impl StreamRowMerge {
assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
assert_eq!(lhs_input.distribution(), rhs_input.distribution());
assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
let functional_dependency =
FunctionalDependencySet::with_key(lhs_mapping.target_size(), &[]);
let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
let o2i_lhs = lhs_mapping
.inverse()
Expand Down Expand Up @@ -84,7 +87,7 @@ impl StreamRowMerge {
lhs_input.ctx(),
schema,
lhs_input.stream_key().map(|k| k.to_vec()),
lhs_input.functional_dependency().clone(),
functional_dependency,
lhs_input.distribution().clone(),
lhs_input.append_only(),
lhs_input.emit_on_window_close(),
Expand Down

0 comments on commit 83b3647

Please sign in to comment.