diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index 72801591e1..6648f3aab3 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -497,49 +497,23 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me time_dimension_specs=query_spec.time_dimension_specs, ) - dataflow_plan_optimizer_sets_to_try = ( - (SourceScanOptimizer(),), - (), - ) - dataflow_plan: Optional[DataflowPlan] = None - convert_to_execution_plan_result: Optional[ConvertToExecutionPlanResult] = None - - for i, dataflow_plan_optimizer_set in enumerate(dataflow_plan_optimizer_sets_to_try): - try: - if query_spec.metric_specs: - dataflow_plan = self._dataflow_plan_builder.build_plan( - query_spec=query_spec, - output_sql_table=output_table, - output_selection_specs=output_selection_specs, - optimizers=(SourceScanOptimizer(),), - ) - else: - dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec) - - if len(dataflow_plan.sink_output_nodes) > 1: - raise NotImplementedError( - f"Multiple output nodes in the dataflow plan not yet supported. " - f"Got tasks: {dataflow_plan.sink_output_nodes}" - ) + if query_spec.metric_specs: + dataflow_plan = self._dataflow_plan_builder.build_plan( + query_spec=query_spec, + output_sql_table=output_table, + output_selection_specs=output_selection_specs, + optimizers=(SourceScanOptimizer(),), + ) + else: + dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec) - convert_to_execution_plan_result = self._to_execution_plan_converter.convert_to_execution_plan( - dataflow_plan - ) + if len(dataflow_plan.sink_output_nodes) > 1: + raise NotImplementedError( + f"Multiple output nodes in the dataflow plan not yet supported. " + f"Got tasks: {dataflow_plan.sink_output_nodes}" + ) - break - except Exception as e: - # Exception even if no optimizers were applied, so propagate the exception up. - if i == len(dataflow_plan_optimizer_sets_to_try) - 1: - raise e - - logger.exception( - f"Got an exception building an execution plan using a dataflow plan created with optimizers: " - f"{dataflow_plan_optimizer_set}. In case the error was due to the optimizer producing an incorrect" - f"plan, retrying creating the dataflow plan with optimizers: " - f"{dataflow_plan_optimizer_sets_to_try[i+1]}" - ) - assert dataflow_plan is not None - assert convert_to_execution_plan_result is not None + convert_to_execution_plan_result = self._to_execution_plan_converter.convert_to_execution_plan(dataflow_plan) return MetricFlowExplainResult( query_spec=query_spec,