From b3db192e2a34eb43395ba178623669d6e7157953 Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
 <41898282+github-actions[bot]@users.noreply.github.com>
Date: Tue, 20 Aug 2024 15:12:04 +0000
Subject: [PATCH] fix(stream): fix functional dependencies for row merge
 (#18072) (#18142)

Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com>
---
 ..._approx_percentile_merge_stateless_agg.slt | 13 ++-
 .../tests/testdata/input/agg.yaml             |  9 ++-
 .../tests/testdata/output/agg.yaml            | 79 +++++++++++++------
 .../stream_global_approx_percentile.rs        |  6 +-
 .../stream_local_approx_percentile.rs         |  4 +-
 .../optimizer/plan_node/stream_row_merge.rs   |  5 +-
 6 files changed, 80 insertions(+), 36 deletions(-)

diff --git a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt
index fd54f7200f958..49d5d781f79f0 100644
--- a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt
+++ b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt
@@ -23,16 +23,15 @@ statement ok
 create materialized view m1 as
  select
      approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
-     sum(p_col),
+     sum(p_col) as s,
      approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
-     count(*),
-     approx_percentile(0.99, 0.01) within group (order by p_col) as p99
+     count(*)::double + approx_percentile(0.99, 0.01) within group (order by p_col) as p99
  from t;
 
 query I
-select * from m1;
+select p01, s, p50, round(p99::numeric, 2)  from m1;
 ----
--982.5779489474152 0 0 2001 982.5779489474152
+-982.5779489474152 0 0 2983.58
 
 # Test state encode / decode
 onlyif can-use-recover
@@ -45,7 +44,7 @@ sleep 10s
 query I
 select * from m1;
 ----
--982.5779489474152 0 0 2001 982.5779489474152
+-982.5779489474152 0 0 2983.5779489474152
 
 # Test 0<x<1 values
 statement ok
@@ -60,7 +59,7 @@ flush;
 query I
 select * from m1;
 ----
--963.1209598593477 0.5501000000000007 0.00009999833511933609 3002 963.1209598593477
+-963.1209598593477 0.5501000000000007 0.00009999833511933609 3965.1209598593477
 
 query I
 select
diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml
index f00c9f2b4065a..1979e4ea1fb77 100644
--- a/src/frontend/planner_test/tests/testdata/input/agg.yaml
+++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml
@@ -1045,7 +1045,14 @@
 - name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
   sql: |
     CREATE TABLE t (v1 int, v2 int);
-    SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) as s2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
+    SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.9, 0.01) WITHIN GROUP (order by v2) as y from t;
+  expected_outputs:
+    - logical_plan
+    - stream_plan
+- name: test simple approx_percentile with duplicated approx_percentile interleaved with stateless simple aggs
+  sql: |
+    CREATE TABLE t (v1 int, v2 int);
+    SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
   expected_outputs:
     - logical_plan
     - stream_plan
diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml
index f6d1af67b331e..eca739788bf6e 100644
--- a/src/frontend/planner_test/tests/testdata/output/agg.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml
@@ -1988,34 +1988,67 @@
 - name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
   sql: |
     CREATE TABLE t (v1 int, v2 int);
-    SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) as s2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
+    SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.9, 0.01) WITHIN GROUP (order by v2) as y from t;
   logical_plan: |-
-    LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
+    LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] }
     └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
       └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
         └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
   stream_plan: |-
-    StreamMaterialize { columns: [s1, x, count, s2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
-    └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
-      ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
-      │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
-      │ │ └─StreamExchange { dist: Single }
-      │ │   └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
-      │ │     └─StreamShare { id: 2 }
-      │ │       └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
-      │ │         └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
-      │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
-      │   └─StreamExchange { dist: Single }
-      │     └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
-      │       └─StreamShare { id: 2 }
-      │         └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
-      │           └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
-      └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] }
-        └─StreamExchange { dist: Single }
-          └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] }
-            └─StreamShare { id: 2 }
-              └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
-                └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+    StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
+    └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] }
+      └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
+        ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
+        │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+        │ │ └─StreamExchange { dist: Single }
+        │ │   └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+        │ │     └─StreamShare { id: 2 }
+        │ │       └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+        │ │         └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+        │ └─StreamGlobalApproxPercentile { quantile: 0.9:Float64, relative_error: 0.01:Float64 }
+        │   └─StreamExchange { dist: Single }
+        │     └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.9:Float64, relative_error: 0.01:Float64 }
+        │       └─StreamShare { id: 2 }
+        │         └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+        │           └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+        └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] }
+          └─StreamExchange { dist: Single }
+            └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] }
+              └─StreamShare { id: 2 }
+                └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+                  └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+- name: test simple approx_percentile with duplicated approx_percentile interleaved with stateless simple aggs
+  sql: |
+    CREATE TABLE t (v1 int, v2 int);
+    SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
+  logical_plan: |-
+    LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] }
+    └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
+      └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
+        └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
+  stream_plan: |-
+    StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
+    └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] }
+      └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] }
+        ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
+        │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+        │ │ └─StreamExchange { dist: Single }
+        │ │   └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+        │ │     └─StreamShare { id: 2 }
+        │ │       └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+        │ │         └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+        │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+        │   └─StreamExchange { dist: Single }
+        │     └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+        │       └─StreamShare { id: 2 }
+        │         └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+        │           └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+        └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] }
+          └─StreamExchange { dist: Single }
+            └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] }
+              └─StreamShare { id: 2 }
+                └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+                  └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
 - name: test simple approx_percentile with descending order
   sql: |
     CREATE TABLE t (v1 int, v2 int);
diff --git a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs
index 2edd61a728acc..e2c795892e5f9 100644
--- a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs
@@ -11,7 +11,6 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-
 use fixedbitset::FixedBitSet;
 use pretty_xmlish::{Pretty, XmlNode};
 use risingwave_common::catalog::{Field, Schema};
@@ -28,7 +27,7 @@ use crate::optimizer::plan_node::utils::{childless_record, Distill, TableCatalog
 use crate::optimizer::plan_node::{
     ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
 };
-use crate::optimizer::property::Distribution;
+use crate::optimizer::property::{Distribution, FunctionalDependencySet};
 use crate::stream_fragmenter::BuildFragmentGraphState;
 use crate::PlanRef;
 
@@ -48,12 +47,13 @@ impl StreamGlobalApproxPercentile {
             DataType::Float64,
             "approx_percentile",
         )]);
+        let functional_dependency = FunctionalDependencySet::with_key(1, &[]);
         let watermark_columns = FixedBitSet::with_capacity(1);
         let base = PlanBase::new_stream(
             input.ctx(),
             schema,
             Some(vec![]),
-            input.functional_dependency().clone(),
+            functional_dependency,
             Distribution::Single,
             input.append_only(),
             input.emit_on_window_close(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs
index 20d27e41f8598..b5af9be49df05 100644
--- a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs
@@ -27,6 +27,7 @@ use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Dis
 use crate::optimizer::plan_node::{
     ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
 };
+use crate::optimizer::property::FunctionalDependencySet;
 use crate::stream_fragmenter::BuildFragmentGraphState;
 use crate::PlanRef;
 
@@ -50,11 +51,12 @@ impl StreamLocalApproxPercentile {
         ]);
         // FIXME(kwannoel): How does watermark work with FixedBitSet
         let watermark_columns = FixedBitSet::with_capacity(3);
+        let functional_dependency = FunctionalDependencySet::with_key(3, &[]);
         let base = PlanBase::new_stream(
             input.ctx(),
             schema,
             input.stream_key().map(|k| k.to_vec()),
-            input.functional_dependency().clone(),
+            functional_dependency,
             input.distribution().clone(),
             input.append_only(),
             input.emit_on_window_close(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_row_merge.rs b/src/frontend/src/optimizer/plan_node/stream_row_merge.rs
index f4c2c894fdf66..1295702135ffc 100644
--- a/src/frontend/src/optimizer/plan_node/stream_row_merge.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_row_merge.rs
@@ -29,6 +29,7 @@ use crate::optimizer::plan_node::utils::{childless_record, Distill};
 use crate::optimizer::plan_node::{
     ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode,
 };
+use crate::optimizer::property::FunctionalDependencySet;
 use crate::stream_fragmenter::BuildFragmentGraphState;
 use crate::PlanRef;
 
@@ -57,6 +58,8 @@ impl StreamRowMerge {
         assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
         assert_eq!(lhs_input.distribution(), rhs_input.distribution());
         assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
+        let functional_dependency =
+            FunctionalDependencySet::with_key(lhs_mapping.target_size(), &[]);
         let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
         let o2i_lhs = lhs_mapping
             .inverse()
@@ -84,7 +87,7 @@ impl StreamRowMerge {
             lhs_input.ctx(),
             schema,
             lhs_input.stream_key().map(|k| k.to_vec()),
-            lhs_input.functional_dependency().clone(),
+            functional_dependency,
             lhs_input.distribution().clone(),
             lhs_input.append_only(),
             lhs_input.emit_on_window_close(),