diff --git a/e2e_test/batch/distribution_mode.slt b/e2e_test/batch/distribution_mode.slt index b680796277c13..f101e5c5446e3 100644 --- a/e2e_test/batch/distribution_mode.slt +++ b/e2e_test/batch/distribution_mode.slt @@ -13,6 +13,7 @@ include ./aggregate/*.slt.part include ./types/*.slt.part include ./functions/*.slt.part include ./over_window/main.slt.part +include ./subquery/**/*.slt.part statement ok SET QUERY_MODE TO auto; diff --git a/e2e_test/batch/local_mode.slt b/e2e_test/batch/local_mode.slt index 68df9f0d91950..a64fd49b85c8f 100644 --- a/e2e_test/batch/local_mode.slt +++ b/e2e_test/batch/local_mode.slt @@ -14,6 +14,7 @@ include ./types/*.slt.part include ./catalog/*.slt.part include ./functions/*.slt.part include ./over_window/main.slt.part +include ./subquery/**/*.slt.part statement ok SET QUERY_MODE TO auto; diff --git a/e2e_test/batch/subquery/scalar_subquery.slt.part b/e2e_test/batch/subquery/scalar_subquery.slt.part index a0676e98f245c..5bb08201b7a75 100644 --- a/e2e_test/batch/subquery/scalar_subquery.slt.part +++ b/e2e_test/batch/subquery/scalar_subquery.slt.part @@ -21,8 +21,10 @@ select (select x from t) x, 1 one; statement error Scalar subquery might produce more than one row create materialized view mv as select (select x from t) x, 1 one; +# Use a random value here to occasionally make it not in the same shard as `114514`, +# demonstrating that `BatchMaxOneRow` correctly handles distributed subqueries. statement ok -insert into t values (1919810); +insert into t values (extract(epoch from now())); # Cannot query as the cardinality of the subquery is now 2 query error Scalar subquery produced more than one row diff --git a/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml b/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml index d5b6149f70877..3604ca4fa0d0b 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml @@ -9,6 +9,7 @@ expected_outputs: - logical_plan - optimized_logical_plan_for_batch + - batch_plan - stream_error - sql: | create table t(x int); @@ -28,6 +29,7 @@ expected_outputs: - logical_plan - optimized_logical_plan_for_batch + - batch_plan - stream_error - sql: | create table t(x int); @@ -35,6 +37,7 @@ expected_outputs: - logical_plan - optimized_logical_plan_for_batch + - batch_plan - stream_error - sql: | create table t(x int); @@ -42,6 +45,7 @@ expected_outputs: - logical_plan - optimized_logical_plan_for_batch + - batch_plan - stream_error - sql: | create table t(x int); @@ -49,6 +53,7 @@ expected_outputs: - logical_plan - optimized_logical_plan_for_batch + - batch_plan - stream_error - sql: | create table t1 (x int, y int); diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml index a43a1d2df7166..519b876c8c67d 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml @@ -26,6 +26,14 @@ ├─LogicalScan { table: t, columns: [] } └─LogicalMaxOneRow └─LogicalScan { table: t, columns: [t.x] } + batch_plan: |- + BatchProject { exprs: [t.x, 1:Int32] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [], distribution: SomeShard } + └─BatchMaxOneRow + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } stream_error: Scalar subquery might produce more than one row. - sql: | create table t(x int); @@ -76,6 +84,16 @@ └─LogicalMaxOneRow └─LogicalTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true } └─LogicalScan { table: t, columns: [t.x] } + batch_plan: |- + BatchProject { exprs: [t.x, 1:Int32] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [], distribution: SomeShard } + └─BatchMaxOneRow + └─BatchTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true } + └─BatchExchange { order: [], dist: Single } + └─BatchTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true } + └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } stream_error: Scalar subquery might produce more than one row. - sql: | create table t(x int); @@ -92,6 +110,14 @@ ├─LogicalScan { table: t, columns: [] } └─LogicalMaxOneRow └─LogicalScan { table: t, columns: [t.x] } + batch_plan: |- + BatchProject { exprs: [(t.x + 1:Int32) as $expr1] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [], distribution: SomeShard } + └─BatchMaxOneRow + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } stream_error: Scalar subquery might produce more than one row. - sql: | create table t(x int); @@ -112,6 +138,14 @@ │ └─LogicalMaxOneRow │ └─LogicalScan { table: t, columns: [t.x] } └─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } } + batch_plan: |- + BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + │ ├─BatchValues { rows: [[]] } + │ └─BatchMaxOneRow + │ └─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } + └─BatchValues { rows: [[1:Int32]] } stream_error: Scalar subquery might produce more than one row. - sql: | create table t(x int); @@ -135,6 +169,19 @@ ├─LogicalScan { table: t, columns: [t.x] } └─LogicalMaxOneRow └─LogicalScan { table: t, columns: [t.x] } + batch_plan: |- + BatchProject { exprs: [(t.x + $expr1) as $expr2] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } + └─BatchMaxOneRow + └─BatchProject { exprs: [(t.x + t.x) as $expr1] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } + └─BatchMaxOneRow + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } stream_error: Scalar subquery might produce more than one row. - sql: | create table t1 (x int, y int); diff --git a/src/frontend/src/optimizer/plan_node/batch_max_one_row.rs b/src/frontend/src/optimizer/plan_node/batch_max_one_row.rs index e35d6976672c4..4b3d0de43cb66 100644 --- a/src/frontend/src/optimizer/plan_node/batch_max_one_row.rs +++ b/src/frontend/src/optimizer/plan_node/batch_max_one_row.rs @@ -25,6 +25,7 @@ use super::{ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::ToLocalBatch; +use crate::optimizer::property::{Order, RequiredDist}; /// [`BatchMaxOneRow`] fetches up to one row from the input, returning an error /// if the input contains more than one row at runtime. @@ -66,7 +67,9 @@ impl Distill for BatchMaxOneRow { impl ToDistributedBatch for BatchMaxOneRow { fn to_distributed(&self) -> Result { - Ok(self.clone_with_input(self.input().to_distributed()?).into()) + let new_input = RequiredDist::single() + .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?; + Ok(self.clone_with_input(new_input).into()) } } @@ -78,7 +81,9 @@ impl ToBatchPb for BatchMaxOneRow { impl ToLocalBatch for BatchMaxOneRow { fn to_local(&self) -> Result { - Ok(self.clone_with_input(self.input().to_local()?).into()) + let new_input = RequiredDist::single() + .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?; + Ok(self.clone_with_input(new_input).into()) } }