From 0b0b227af594e5a512edb6b663e7ab5a96ae623e Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 28 Nov 2023 16:38:16 +0800 Subject: [PATCH] test: update planner tests Signed-off-by: Richard Chien --- .../tests/testdata/output/column_pruning.yaml | 7 +- .../testdata/output/over_window_function.yaml | 85 ++++++++++--------- 2 files changed, 48 insertions(+), 44 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml b/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml index dffafd3bf4c79..47c343d33d694 100644 --- a/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml +++ b/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml @@ -151,9 +151,10 @@ optimized_logical_plan_for_batch: |- LogicalProject { exprs: [row_number] } └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalFilter { predicate: (t.y > 0:Int32) } - └─LogicalTopN { order: [t.y ASC], limit: 3, offset: 0, group_key: [t.x] } - └─LogicalScan { table: t, output_columns: [t.x, t.y], required_columns: [t.x, t.y, t.z], predicate: (t.z > 0:Int32) AND (t.x > 0:Int32) } + └─LogicalProject { exprs: [t.x, t.y] } + └─LogicalFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) } + └─LogicalTopN { order: [t.y ASC], limit: 3, offset: 0, group_key: [t.x] } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z], predicate: (t.x > 0:Int32) } - name: mixed sql: | create table t (v1 bigint, v2 double precision, v3 int); diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index fd7a802524848..90b033024919f 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -311,24 +311,24 @@ BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - (($expr4 * $expr4) / $expr5)) / $expr5)) as $expr6, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr7] } └─BatchProject { exprs: [t.x, t.y, t.z, $expr2, $expr1, $expr3, sum, sum, count, sum, sum, count, sum::Decimal as $expr4, count::Decimal as $expr5] } - └─BatchFilter { predicate: (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } + └─BatchFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } └─BatchOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─BatchExchange { order: [t.z ASC, t.x ASC], dist: HashShard(t.z) } └─BatchSort { order: [t.z ASC, t.x ASC] } └─BatchProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3] } └─BatchProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1] } - └─BatchFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) } + └─BatchFilter { predicate: (t.z > 0:Int32) } └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, z, res0, res1, t._row_id(hidden)], stream_key: [t._row_id, z], pk_columns: [t._row_id, z], pk_conflict: NoCheck } └─StreamProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - (($expr4 * $expr4) / $expr5)) / $expr5)) as $expr6, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr7, t._row_id] } └─StreamProject { exprs: [t.x, t.y, t.z, $expr2, $expr1, $expr3, sum, sum, count, sum, sum, count, sum::Decimal as $expr4, count::Decimal as $expr5, t._row_id] } - └─StreamFilter { predicate: (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } + └─StreamFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } └─StreamOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard(t.z) } └─StreamProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3, t._row_id] } └─StreamProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1, t._row_id] } - └─StreamFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) } + └─StreamFilter { predicate: (t.z > 0:Int32) } └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: aggregate with expression in func arguments and over clause sql: | @@ -346,29 +346,27 @@ └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id] } optimized_logical_plan_for_batch: |- LogicalProject { exprs: [t.x, t.y, t.z, (sum::Decimal / count::Decimal) as $expr4] } - └─LogicalFilter { predicate: ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } + └─LogicalFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } └─LogicalOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─LogicalProject { exprs: [t.x, t.y, t.z, (t.z * t.z) as $expr1, (t.y + 1:Int32) as $expr2, Abs(t.w) as $expr3] } - └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w], predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t.x, t.y, t.z, (sum::Decimal / count::Decimal) as $expr4] } - └─BatchFilter { predicate: ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } + └─BatchFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } └─BatchOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─BatchExchange { order: [$expr2 ASC, $expr3 ASC], dist: HashShard($expr2) } └─BatchSort { order: [$expr2 ASC, $expr3 ASC] } └─BatchProject { exprs: [t.x, t.y, t.z, (t.z * t.z) as $expr1, (t.y + 1:Int32) as $expr2, Abs(t.w) as $expr3] } - └─BatchFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } - └─BatchScan { table: t, columns: [t.x, t.y, t.z, t.w], distribution: SomeShard } + └─BatchScan { table: t, columns: [t.x, t.y, t.z, t.w], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, z, res, t._row_id(hidden), $expr2(hidden)], stream_key: [t._row_id, $expr2], pk_columns: [t._row_id, $expr2], pk_conflict: NoCheck } └─StreamProject { exprs: [t.x, t.y, t.z, (sum::Decimal / count::Decimal) as $expr4, t._row_id, $expr2] } - └─StreamFilter { predicate: ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } + └─StreamFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } └─StreamOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard($expr2) } └─StreamProject { exprs: [t.x, t.y, t.z, (t.z * t.z) as $expr1, (t.y + 1:Int32) as $expr2, Abs(t.w) as $expr3, t._row_id] } - └─StreamFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } - └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: row_number with empty over clause sql: | create table t(x int); @@ -771,30 +769,31 @@ └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t._row_id] } optimized_logical_plan_for_stream: |- LogicalProject { exprs: [row_number] } - └─LogicalTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } - └─LogicalTopN { order: [t.z ASC], limit: 9, offset: 0, group_key: [t.x, t.y] } - └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─LogicalFilter { predicate: (row_number < 10:Int32) AND (row_number < 10:Int32) } + └─LogicalTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─LogicalScan { table: t, columns: [t.x, t.y, t.z] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [row_number] } - └─BatchGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } - └─BatchGroupTopN { order: [t.z ASC], limit: 9, offset: 0, group_key: [t.x, t.y] } + └─BatchFilter { predicate: (row_number < 10:Int32) AND (row_number < 10:Int32) } + └─BatchGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } └─BatchExchange { order: [], dist: HashShard(t.x, t.y) } - └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchSort { order: [t.x ASC, t.y ASC] } - └─BatchGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } - └─BatchExchange { order: [], dist: HashShard(t.x) } - └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [t.x ASC, t.y ASC, t.z ASC] } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [t.x ASC, t.y ASC], dist: HashShard(t.x) } + └─BatchSort { order: [t.x ASC, t.y ASC] } + └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [r1, t.x(hidden), t.y(hidden), t._row_id(hidden)], stream_key: [t.x, t.y, t._row_id], pk_columns: [t.x, t.y, t._row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [row_number, t.x, t.y, t._row_id] } - └─StreamGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } - └─StreamGroupTopN { order: [t.z ASC], limit: 9, offset: 0, group_key: [t.x, t.y] } + └─StreamFilter { predicate: (row_number < 10:Int32) AND (row_number < 10:Int32) } + └─StreamGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } └─StreamExchange { dist: HashShard(t.x, t.y) } - └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard(t.x) } └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: TopN among multiple window function calls, some not TopN @@ -819,24 +818,28 @@ └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t._row_id] } optimized_logical_plan_for_stream: |- LogicalProject { exprs: [row_number, rank] } - └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } - └─LogicalScan { table: t, columns: [t.x, t.y, t.z] } + └─LogicalFilter { predicate: (row_number < 10:Int32) } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [row_number, rank] } - └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchSort { order: [t.x ASC, t.y ASC, t.z ASC] } - └─BatchGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } - └─BatchExchange { order: [], dist: HashShard(t.x) } - └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } + └─BatchFilter { predicate: (row_number < 10:Int32) } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [t.x ASC, t.y ASC, t.z ASC] } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [t.x ASC, t.y ASC], dist: HashShard(t.x) } + └─BatchSort { order: [t.x ASC, t.y ASC] } + └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [r2, r3, t.x(hidden), t._row_id(hidden), t.y(hidden)], stream_key: [t.x, t._row_id, t.y], pk_columns: [t.x, t._row_id, t.y], pk_conflict: NoCheck } - └─StreamProject { exprs: [row_number, rank, t.x, t._row_id, t.y] } - └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } - └─StreamExchange { dist: HashShard(t.x) } - └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + StreamMaterialize { columns: [r2, r3, t._row_id(hidden), t.x(hidden), t.y(hidden)], stream_key: [t._row_id, t.x, t.y], pk_columns: [t._row_id, t.x, t.y], pk_conflict: NoCheck } + └─StreamProject { exprs: [row_number, rank, t._row_id, t.x, t.y] } + └─StreamFilter { predicate: (row_number < 10:Int32) } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamExchange { dist: HashShard(t.x) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: create_bid sql: | /*