Skip to content

Commit

Permalink
fix: dist planner has wrong behavior in table with multiple partitions (
Browse files Browse the repository at this point in the history
#2237)

* fix: dist planner has wrong behavior in table with multiple partitions

Signed-off-by: Ruihang Xia <[email protected]>

* Update tests/cases/distributed/explain/multi_partitions.sql

Co-authored-by: Zhenchi <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Zhenchi <[email protected]>
  • Loading branch information
waynexia and zhongzc authored Aug 23, 2023
1 parent 4dbc32f commit d581688
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 9 deletions.
11 changes: 8 additions & 3 deletions src/query/src/dist_plan/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl ExpandState {
}
}

#[derive(Debug)]
struct CommutativeVisitor {
next_stage: Vec<LogicalPlan>,
// hash of the stop node
Expand Down Expand Up @@ -325,7 +326,7 @@ mod test {
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Aggregate: groupBy=[[]], aggr=[[AVG(t.number)]]",
" TableScan: t",
" MergeScan [is_placeholder=false]",
]
.join("\n");
assert_eq!(expected, format!("{:?}", result));
Expand All @@ -352,7 +353,7 @@ mod test {
let expected = [
"Sort: t.number ASC NULLS LAST",
" Distinct:",
" TableScan: t",
" MergeScan [is_placeholder=false]",
]
.join("\n");
assert_eq!(expected, format!("{:?}", result));
Expand All @@ -374,7 +375,11 @@ mod test {

let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = ["Limit: skip=0, fetch=1", " TableScan: t"].join("\n");
let expected = [
"Limit: skip=0, fetch=1",
" MergeScan [is_placeholder=false]",
]
.join("\n");
assert_eq!(expected, format!("{:?}", result));
}
}
2 changes: 1 addition & 1 deletion src/query/src/dist_plan/commutativity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Categorizer {
Commutativity::Unimplemented
}
LogicalPlan::Union(_) => Commutativity::Unimplemented,
LogicalPlan::TableScan(_) => Commutativity::CheckPartition,
LogicalPlan::TableScan(_) => Commutativity::Commutative,
LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative,
LogicalPlan::Subquery(_) => Commutativity::Unimplemented,
LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented,
Expand Down
2 changes: 2 additions & 0 deletions tests-integration/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_exec_sql() {
common_telemetry::init_default_ut_logging();

let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
let frontend = distributed.frontend();
let instance = frontend.as_ref();
Expand Down
38 changes: 38 additions & 0 deletions tests/cases/distributed/explain/multi_partitions.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
);

Affected Rows: 0

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host;

+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: demo.host ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+

drop table demo;

Affected Rows: 1

23 changes: 23 additions & 0 deletions tests/cases/distributed/explain/multi_partitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
);

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host;

drop table demo;
10 changes: 5 additions & 5 deletions tests/cases/distributed/optimizer/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ explain select * from numbers;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: numbers projection=[number] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Expand All @@ -14,7 +14,7 @@ explain select * from numbers order by number desc;
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | TableScan: numbers projection=[number] |
| | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
Expand All @@ -26,7 +26,7 @@ explain select * from numbers order by number asc;
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | TableScan: numbers projection=[number] |
| | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
Expand All @@ -39,7 +39,7 @@ explain select * from numbers order by number desc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | TableScan: numbers projection=[number] |
| | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
Expand All @@ -53,7 +53,7 @@ explain select * from numbers order by number asc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | TableScan: numbers projection=[number] |
| | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
Expand Down

0 comments on commit d581688

Please sign in to comment.