diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 3686cff83699..09d4d411bbfc 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -119,6 +119,7 @@ impl OutputMeta { } pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan; +pub type DfPhysicalPlanRef = Arc; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)] pub enum AddColumnLocation { diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index da95851b87a6..26b906fd985d 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod order_hint; +pub mod remove_duplicate; pub mod string_normalization; #[cfg(test)] mod test_util; diff --git a/src/query/src/optimizer/remove_duplicate.rs b/src/query/src/optimizer/remove_duplicate.rs new file mode 100644 index 000000000000..10973e5ed5ee --- /dev/null +++ b/src/query/src/optimizer/remove_duplicate.rs @@ -0,0 +1,117 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::DfPhysicalPlanRef; +use datafusion::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::Result as DfResult; + +/// This is [PhysicalOptimizerRule] to remove duplicate physical plans such as two +/// adjoining [CoalesceBatchesExec] or [RepartitionExec]. They won't have any effect +/// if one runs right after another. +/// +/// This rule is expected to be run in the final stage of the optimization process. +pub struct RemoveDuplicate; + +impl PhysicalOptimizerRule for RemoveDuplicate { + fn optimize( + &self, + plan: DfPhysicalPlanRef, + _config: &ConfigOptions, + ) -> DfResult { + Self::do_optimize(plan) + } + + fn name(&self) -> &str { + "RemoveDuplicateRule" + } + + fn schema_check(&self) -> bool { + false + } +} + +impl RemoveDuplicate { + fn do_optimize(plan: DfPhysicalPlanRef) -> DfResult { + let result = plan + .transform_down_mut(&mut |plan| { + if plan.as_any().is::() + || plan.as_any().is::() + { + // check child + let child = plan.children()[0].clone(); + if child.as_any().type_id() == plan.as_any().type_id() { + // remove child + let grand_child = child.children()[0].clone(); + let new_plan = plan.with_new_children(vec![grand_child])?; + return Ok(Transformed::yes(new_plan)); + } + } + + Ok(Transformed::no(plan)) + })? + .data; + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::Schema; + use datafusion::physical_plan::displayable; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion_physical_expr::Partitioning; + + use super::*; + + #[test] + fn remove_coalesce_batches() { + let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty()))); + let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024)); + let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(coalesce_batches, 8192)); + + let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap(); + let formatted = displayable(optimized.as_ref()).indent(true).to_string(); + let expected = "CoalesceBatchesExec: target_batch_size=8192\ + \n EmptyExec\n"; + + assert_eq!(expected, formatted); + } + + #[test] + fn non_continuous_coalesce_batches() { + let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty()))); + let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024)); + let repartition = Arc::new( + RepartitionExec::try_new(coalesce_batches, Partitioning::UnknownPartitioning(1)) + .unwrap(), + ); + let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(repartition, 8192)); + + let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap(); + let formatted = displayable(optimized.as_ref()).indent(true).to_string(); + let expected = "CoalesceBatchesExec: target_batch_size=8192\ + \n RepartitionExec: partitioning=UnknownPartitioning(1), input_partitions=1\ + \n CoalesceBatchesExec: target_batch_size=1024\ + \n EmptyExec\n"; + + assert_eq!(expected, formatted); + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 96e88255188f..bf2c01de33ed 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -30,6 +30,7 @@ use datafusion::dataframe::DataFrame; use datafusion::error::Result as DfResult; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_optimizer::optimizer::PhysicalOptimizer; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; use datafusion_expr::LogicalPlan as DfLogicalPlan; @@ -43,6 +44,7 @@ use table::TableRef; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; use crate::optimizer::order_hint::OrderHintRule; +use crate::optimizer::remove_duplicate::RemoveDuplicate; use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; use crate::optimizer::ExtensionAnalyzerRule; @@ -99,6 +101,9 @@ impl QueryEngineState { } let mut optimizer = Optimizer::new(); optimizer.rules.push(Arc::new(OrderHintRule)); + // add physical optimizer + let mut physical_optimizer = PhysicalOptimizer::new(); + physical_optimizer.rules.push(Arc::new(RemoveDuplicate)); let session_state = SessionState::new_with_config_rt(session_config, runtime_env) .with_serializer_registry(Arc::new(ExtensionSerializer)) @@ -107,7 +112,8 @@ impl QueryEngineState { catalog_list.clone(), region_query_handler, ))) - .with_optimizer_rules(optimizer.rules); + .with_optimizer_rules(optimizer.rules) + .with_physical_optimizer_rules(physical_optimizer.rules); let df_context = SessionContext::new_with_state(session_state); diff --git a/tests/cases/distributed/explain/join_10_tables.result b/tests/cases/distributed/explain/join_10_tables.result index 8b40f21fc66f..f44fc41fa79a 100644 --- a/tests/cases/distributed/explain/join_10_tables.result +++ b/tests/cases/distributed/explain/join_10_tables.result @@ -132,35 +132,27 @@ limit 1; |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/explain/subqueries.result b/tests/cases/distributed/explain/subqueries.result index 593e745e1d99..ad31923c97fe 100644 --- a/tests/cases/distributed/explain/subqueries.result +++ b/tests/cases/distributed/explain/subqueries.result @@ -25,11 +25,9 @@ EXPLAIN SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i; |_|_REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ @@ -58,11 +56,9 @@ EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1 |_|_REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_ProjectionExec: expr=[i@0 as i]_| |_|_MergeScanExec: REDACTED |_|_| diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index eb6be427d30c..024c3f80de60 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -165,6 +165,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after PipelineChecker_| SAME TEXT AS ABOVE_| | physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| +| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_| | physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_|