diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 74184d523985..66405212e6fa 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -34,7 +34,6 @@ use common_telemetry::debug; use either::Either; use meta_client::client::MetaClientBuilder; use query::datafusion::DatafusionQueryEngine; -use query::logical_optimizer::LogicalOptimizer; use query::parser::QueryLanguageParser; use query::plan::LogicalPlan; use query::query_engine::{DefaultSerializer, QueryEngineState}; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 828be6e99f11..93b9e0f920a6 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -55,9 +55,7 @@ use crate::error::{ TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; -use crate::logical_optimizer::LogicalOptimizer; use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED}; -use crate::physical_optimizer::PhysicalOptimizer; use crate::physical_wrapper::PhysicalPlanWrapperRef; use crate::plan::LogicalPlan; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; @@ -310,6 +308,70 @@ impl DatafusionQueryEngine { } } } + + #[tracing::instrument(skip_all)] + pub fn optimize( + &self, + context: &QueryEngineContext, + plan: &LogicalPlan, + ) -> Result { + let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer(); + match plan { + LogicalPlan::DfPlan(df_plan) => { + // Optimized by extension rules + let optimized_plan = self + .state + .optimize_by_extension_rules(df_plan.clone(), context) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + + // Optimized by datafusion optimizer + let optimized_plan = self + .state + .session_state() + .optimize(&optimized_plan) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + + Ok(LogicalPlan::DfPlan(optimized_plan)) + } + } + } + + #[tracing::instrument(skip_all)] + fn optimize_physical_plan( + &self, + ctx: &mut QueryEngineContext, + plan: Arc, + ) -> Result> { + let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer(); + + let state = ctx.state(); + let config = state.config_options(); + // skip optimize AnalyzeExec plan + let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::() + { + let mut new_plan = analyze_plan.input().clone(); + for optimizer in state.physical_optimizers() { + new_plan = optimizer + .optimize(new_plan, config) + .context(DataFusionSnafu)?; + } + Arc::new(DistAnalyzeExec::new(new_plan)) + } else { + let mut new_plan = plan; + for optimizer in state.physical_optimizers() { + new_plan = optimizer + .optimize(new_plan, config) + .context(DataFusionSnafu)?; + } + new_plan + }; + + Ok(optimized_plan) + } } #[async_trait] @@ -387,70 +449,6 @@ impl QueryEngine for DatafusionQueryEngine { } } -impl LogicalOptimizer for DatafusionQueryEngine { - #[tracing::instrument(skip_all)] - fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result { - let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer(); - match plan { - LogicalPlan::DfPlan(df_plan) => { - // Optimized by extension rules - let optimized_plan = self - .state - .optimize_by_extension_rules(df_plan.clone(), context) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; - - // Optimized by datafusion optimizer - let optimized_plan = self - .state - .session_state() - .optimize(&optimized_plan) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; - - Ok(LogicalPlan::DfPlan(optimized_plan)) - } - } - } -} - -impl PhysicalOptimizer for DatafusionQueryEngine { - #[tracing::instrument(skip_all)] - fn optimize_physical_plan( - &self, - ctx: &mut QueryEngineContext, - plan: Arc, - ) -> Result> { - let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer(); - - let state = ctx.state(); - let config = state.config_options(); - // skip optimize AnalyzeExec plan - let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::() - { - let mut new_plan = analyze_plan.input().clone(); - for optimizer in state.physical_optimizers() { - new_plan = optimizer - .optimize(new_plan, config) - .context(DataFusionSnafu)?; - } - Arc::new(DistAnalyzeExec::new(new_plan)) - } else { - let mut new_plan = plan; - for optimizer in state.physical_optimizers() { - new_plan = optimizer - .optimize(new_plan, config) - .context(DataFusionSnafu)?; - } - new_plan - }; - - Ok(optimized_plan) - } -} - impl QueryExecutor for DatafusionQueryEngine { #[tracing::instrument(skip_all)] fn execute_stream( diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index d4e9dbae66b2..d6dfc5e09734 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -24,11 +24,9 @@ pub mod dist_plan; pub mod dummy_catalog; pub mod error; pub mod executor; -pub mod logical_optimizer; pub mod metrics; mod optimizer; pub mod parser; -pub mod physical_optimizer; pub mod physical_wrapper; pub mod plan; pub mod planner; diff --git a/src/query/src/logical_optimizer.rs b/src/query/src/logical_optimizer.rs deleted file mode 100644 index ab9bff445879..000000000000 --- a/src/query/src/logical_optimizer.rs +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::error::Result; -use crate::plan::LogicalPlan; -use crate::QueryEngineContext; - -/// Logical plan optimizer, rewrite the [`LogicalPlan`] in some way. -pub trait LogicalOptimizer { - /// Optimize the `plan` - fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result; -} diff --git a/src/query/src/physical_optimizer.rs b/src/query/src/physical_optimizer.rs deleted file mode 100644 index 7a4c28513b96..000000000000 --- a/src/query/src/physical_optimizer.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use datafusion::physical_plan::ExecutionPlan; - -use crate::error::Result; -use crate::query_engine::QueryEngineContext; - -pub trait PhysicalOptimizer { - fn optimize_physical_plan( - &self, - ctx: &mut QueryEngineContext, - plan: Arc, - ) -> Result>; -}