Skip to content

Commit

Permalink
fix(optimizer): enforce input of BatchMaxOneRow to be singleton (ri…
Browse files Browse the repository at this point in the history
…singwavelabs#19452)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 20, 2024
1 parent 5b224fa commit c325f42
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 3 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/distribution_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ include ./aggregate/*.slt.part
include ./types/*.slt.part
include ./functions/*.slt.part
include ./over_window/main.slt.part
include ./subquery/**/*.slt.part

statement ok
SET QUERY_MODE TO auto;
1 change: 1 addition & 0 deletions e2e_test/batch/local_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ include ./types/*.slt.part
include ./catalog/*.slt.part
include ./functions/*.slt.part
include ./over_window/main.slt.part
include ./subquery/**/*.slt.part

statement ok
SET QUERY_MODE TO auto;
Expand Down
4 changes: 3 additions & 1 deletion e2e_test/batch/subquery/scalar_subquery.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ select (select x from t) x, 1 one;
statement error Scalar subquery might produce more than one row
create materialized view mv as select (select x from t) x, 1 one;

# Use a random value here to occasionally make it not in the same shard as `114514`,
# demonstrating that `BatchMaxOneRow` correctly handles distributed subqueries.
statement ok
insert into t values (1919810);
insert into t values (extract(epoch from now()));

# Cannot query as the cardinality of the subquery is now 2
query error Scalar subquery produced more than one row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- stream_error
- sql: |
create table t(x int);
Expand All @@ -28,27 +29,31 @@
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- stream_error
- sql: |
create table t(x int);
select (select x from t) + 1 a from t;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- stream_error
- sql: |
create table t(x int);
select (select x from t) a, (select 1) b;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- stream_error
- sql: |
create table t(x int);
select x + (select x + (select x as v1 from t) as v2 from t) as v3 from t;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- stream_error
- sql: |
create table t1 (x int, y int);
Expand Down
47 changes: 47 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
├─LogicalScan { table: t, columns: [] }
└─LogicalMaxOneRow
└─LogicalScan { table: t, columns: [t.x] }
batch_plan: |-
BatchProject { exprs: [t.x, 1:Int32] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t, columns: [], distribution: SomeShard }
└─BatchMaxOneRow
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
Expand Down Expand Up @@ -76,6 +84,16 @@
└─LogicalMaxOneRow
└─LogicalTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true }
└─LogicalScan { table: t, columns: [t.x] }
batch_plan: |-
BatchProject { exprs: [t.x, 1:Int32] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t, columns: [], distribution: SomeShard }
└─BatchMaxOneRow
└─BatchTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true }
└─BatchExchange { order: [], dist: Single }
└─BatchTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
Expand All @@ -92,6 +110,14 @@
├─LogicalScan { table: t, columns: [] }
└─LogicalMaxOneRow
└─LogicalScan { table: t, columns: [t.x] }
batch_plan: |-
BatchProject { exprs: [(t.x + 1:Int32) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t, columns: [], distribution: SomeShard }
└─BatchMaxOneRow
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
Expand All @@ -112,6 +138,14 @@
│ └─LogicalMaxOneRow
│ └─LogicalScan { table: t, columns: [t.x] }
└─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
batch_plan: |-
BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
│ ├─BatchValues { rows: [[]] }
│ └─BatchMaxOneRow
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
└─BatchValues { rows: [[1:Int32]] }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
Expand All @@ -135,6 +169,19 @@
├─LogicalScan { table: t, columns: [t.x] }
└─LogicalMaxOneRow
└─LogicalScan { table: t, columns: [t.x] }
batch_plan: |-
BatchProject { exprs: [(t.x + $expr1) as $expr2] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
└─BatchMaxOneRow
└─BatchProject { exprs: [(t.x + t.x) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
└─BatchMaxOneRow
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t1 (x int, y int);
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_max_one_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
use crate::error::Result;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Order, RequiredDist};

/// [`BatchMaxOneRow`] fetches up to one row from the input, returning an error
/// if the input contains more than one row at runtime.
Expand Down Expand Up @@ -66,7 +67,9 @@ impl Distill for BatchMaxOneRow {

impl ToDistributedBatch for BatchMaxOneRow {
fn to_distributed(&self) -> Result<PlanRef> {
Ok(self.clone_with_input(self.input().to_distributed()?).into())
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}

Expand All @@ -78,7 +81,9 @@ impl ToBatchPb for BatchMaxOneRow {

impl ToLocalBatch for BatchMaxOneRow {
fn to_local(&self) -> Result<PlanRef> {
Ok(self.clone_with_input(self.input().to_local()?).into())
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}

Expand Down

0 comments on commit c325f42

Please sign in to comment.