Skip to content

Commit

Permalink
feat: physical optimizer RemoveDuplicate to remove duplicate exec pla…
Browse files Browse the repository at this point in the history
…ns (#3839)

* feat: physical optimizer RemoveDuplicate to remove duplicate exec plans

Signed-off-by: Ruihang Xia <[email protected]>

* update document

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness results

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Apr 30, 2024
1 parent aba5e41 commit a0f4881
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl OutputMeta {
}

pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;
pub type DfPhysicalPlanRef = Arc<dyn DfPhysicalPlan>;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
pub enum AddColumnLocation {
Expand Down
1 change: 1 addition & 0 deletions src/query/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod order_hint;
pub mod remove_duplicate;
pub mod string_normalization;
#[cfg(test)]
mod test_util;
Expand Down
117 changes: 117 additions & 0 deletions src/query/src/optimizer/remove_duplicate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_query::DfPhysicalPlanRef;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result as DfResult;

/// This is [PhysicalOptimizerRule] to remove duplicate physical plans such as two
/// adjoining [CoalesceBatchesExec] or [RepartitionExec]. They won't have any effect
/// if one runs right after another.
///
/// This rule is expected to be run in the final stage of the optimization process.
pub struct RemoveDuplicate;

impl PhysicalOptimizerRule for RemoveDuplicate {
fn optimize(
&self,
plan: DfPhysicalPlanRef,
_config: &ConfigOptions,
) -> DfResult<DfPhysicalPlanRef> {
Self::do_optimize(plan)
}

fn name(&self) -> &str {
"RemoveDuplicateRule"
}

fn schema_check(&self) -> bool {
false
}
}

impl RemoveDuplicate {
fn do_optimize(plan: DfPhysicalPlanRef) -> DfResult<DfPhysicalPlanRef> {
let result = plan
.transform_down_mut(&mut |plan| {
if plan.as_any().is::<CoalesceBatchesExec>()
|| plan.as_any().is::<RepartitionExec>()
{
// check child
let child = plan.children()[0].clone();
if child.as_any().type_id() == plan.as_any().type_id() {
// remove child
let grand_child = child.children()[0].clone();
let new_plan = plan.with_new_children(vec![grand_child])?;
return Ok(Transformed::yes(new_plan));
}
}

Ok(Transformed::no(plan))
})?
.data;

Ok(result)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use arrow_schema::Schema;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_physical_expr::Partitioning;

use super::*;

#[test]
fn remove_coalesce_batches() {
let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024));
let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(coalesce_batches, 8192));

let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap();
let formatted = displayable(optimized.as_ref()).indent(true).to_string();
let expected = "CoalesceBatchesExec: target_batch_size=8192\
\n EmptyExec\n";

assert_eq!(expected, formatted);
}

#[test]
fn non_continuous_coalesce_batches() {
let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024));
let repartition = Arc::new(
RepartitionExec::try_new(coalesce_batches, Partitioning::UnknownPartitioning(1))
.unwrap(),
);
let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(repartition, 8192));

let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap();
let formatted = displayable(optimized.as_ref()).indent(true).to_string();
let expected = "CoalesceBatchesExec: target_batch_size=8192\
\n RepartitionExec: partitioning=UnknownPartitioning(1), input_partitions=1\
\n CoalesceBatchesExec: target_batch_size=1024\
\n EmptyExec\n";

assert_eq!(expected, formatted);
}
}
8 changes: 7 additions & 1 deletion src/query/src/query_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
Expand All @@ -43,6 +44,7 @@ use table::TableRef;

use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::optimizer::order_hint::OrderHintRule;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
use crate::optimizer::string_normalization::StringNormalizationRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::optimizer::ExtensionAnalyzerRule;
Expand Down Expand Up @@ -99,6 +101,9 @@ impl QueryEngineState {
}
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(OrderHintRule));
// add physical optimizer
let mut physical_optimizer = PhysicalOptimizer::new();
physical_optimizer.rules.push(Arc::new(RemoveDuplicate));

let session_state = SessionState::new_with_config_rt(session_config, runtime_env)
.with_serializer_registry(Arc::new(ExtensionSerializer))
Expand All @@ -107,7 +112,8 @@ impl QueryEngineState {
catalog_list.clone(),
region_query_handler,
)))
.with_optimizer_rules(optimizer.rules);
.with_optimizer_rules(optimizer.rules)
.with_physical_optimizer_rules(physical_optimizer.rules);

let df_context = SessionContext::new_with_state(session_state);

Expand Down
8 changes: 0 additions & 8 deletions tests/cases/distributed/explain/join_10_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -132,35 +132,27 @@ limit 1;
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
Expand Down
4 changes: 0 additions & 4 deletions tests/cases/distributed/explain/subqueries.result
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ EXPLAIN SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i;
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
Expand Down Expand Up @@ -58,11 +56,9 @@ EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[i@0 as i]_|
|_|_MergeScanExec: REDACTED
|_|_|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after PipelineChecker_| SAME TEXT AS ABOVE_|
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
| physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
Expand Down

0 comments on commit a0f4881

Please sign in to comment.