Skip to content

Commit

Permalink
feat: dist plan optimize part 2 (#2543)
Browse files Browse the repository at this point in the history
* allow udf and scalar fn

Signed-off-by: Ruihang Xia <[email protected]>

* put CountWildcardRule before dist planner

Signed-off-by: Ruihang Xia <[email protected]>

* bump datafusion to fix first_value/last_value

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness result

Signed-off-by: Ruihang Xia <[email protected]>

* use retain instead

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Oct 9, 2023
1 parent 17b385a commit d5e4fca
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 39 deletions.
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ arrow-schema = { version = "43.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
Expand Down
6 changes: 3 additions & 3 deletions src/query/src/dist_plan/commutativity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ impl Categorizer {
| Expr::Negative(_)
| Expr::Between(_)
| Expr::Sort(_)
| Expr::Exists(_) => Commutativity::Commutative,
| Expr::Exists(_)
| Expr::ScalarFunction(_)
| Expr::ScalarUDF(_) => Commutativity::Commutative,

Expr::Like(_)
| Expr::SimilarTo(_)
Expand All @@ -150,8 +152,6 @@ impl Categorizer {
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::ScalarFunction(_)
| Expr::ScalarUDF(_)
| Expr::AggregateFunction(_)
| Expr::WindowFunction(_)
| Expr::AggregateUDF(_)
Expand Down
20 changes: 9 additions & 11 deletions src/query/src/query_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::optimizer::Optimizer;
use promql::extension_plan::PromExtensionPlanner;
use substrait::extension_serializer::ExtensionSerializer;
Expand Down Expand Up @@ -88,6 +89,8 @@ impl QueryEngineState {
}
analyzer.rules.insert(0, Arc::new(TypeConversionRule));
analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
Self::remove_analyzer_rule(&mut analyzer.rules, CountWildcardRule {}.name());
analyzer.rules.insert(0, Arc::new(CountWildcardRule {}));
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(OrderHintRule));

Expand Down Expand Up @@ -132,20 +135,15 @@ impl QueryEngineState {
}
}

fn remove_analyzer_rule(rules: &mut Vec<Arc<dyn AnalyzerRule + Send + Sync>>, name: &str) {
rules.retain(|rule| rule.name() != name);
}

fn remove_physical_optimize_rule(
rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
name: &str,
) {
let mut index_to_move = None;
for (i, rule) in rules.iter().enumerate() {
if rule.name() == name {
index_to_move = Some(i);
break;
}
}
if let Some(index) = index_to_move {
let _ = rules.remove(index);
}
rules.retain(|rule| rule.name() != name);
}

/// Register a udf function
Expand Down
11 changes: 2 additions & 9 deletions tests/cases/distributed/explain/single_partition.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,8 @@ EXPLAIN SELECT COUNT(*) FROM single_partition;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]_|
|_|_Projection:_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+

Expand Down

0 comments on commit d5e4fca

Please sign in to comment.