Skip to content

Commit

Permalink
Merge pull request #1125 from dbt-labs/remove-optimizer-set-to-try-loop
Browse files Browse the repository at this point in the history
Remove pointless loop over DataflowPlan optimizer sets
  • Loading branch information
tlento authored Apr 11, 2024
2 parents 9000634 + c013270 commit 5603dc7
Showing 1 changed file with 15 additions and 41 deletions.
56 changes: 15 additions & 41 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,49 +497,23 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
time_dimension_specs=query_spec.time_dimension_specs,
)

dataflow_plan_optimizer_sets_to_try = (
(SourceScanOptimizer(),),
(),
)
dataflow_plan: Optional[DataflowPlan] = None
convert_to_execution_plan_result: Optional[ConvertToExecutionPlanResult] = None

for i, dataflow_plan_optimizer_set in enumerate(dataflow_plan_optimizer_sets_to_try):
try:
if query_spec.metric_specs:
dataflow_plan = self._dataflow_plan_builder.build_plan(
query_spec=query_spec,
output_sql_table=output_table,
output_selection_specs=output_selection_specs,
optimizers=(SourceScanOptimizer(),),
)
else:
dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec)

if len(dataflow_plan.sink_output_nodes) > 1:
raise NotImplementedError(
f"Multiple output nodes in the dataflow plan not yet supported. "
f"Got tasks: {dataflow_plan.sink_output_nodes}"
)
if query_spec.metric_specs:
dataflow_plan = self._dataflow_plan_builder.build_plan(
query_spec=query_spec,
output_sql_table=output_table,
output_selection_specs=output_selection_specs,
optimizers=(SourceScanOptimizer(),),
)
else:
dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec)

convert_to_execution_plan_result = self._to_execution_plan_converter.convert_to_execution_plan(
dataflow_plan
)
if len(dataflow_plan.sink_output_nodes) > 1:
raise NotImplementedError(
f"Multiple output nodes in the dataflow plan not yet supported. "
f"Got tasks: {dataflow_plan.sink_output_nodes}"
)

break
except Exception as e:
# Exception even if no optimizers were applied, so propagate the exception up.
if i == len(dataflow_plan_optimizer_sets_to_try) - 1:
raise e

logger.exception(
f"Got an exception building an execution plan using a dataflow plan created with optimizers: "
f"{dataflow_plan_optimizer_set}. In case the error was due to the optimizer producing an incorrect"
f"plan, retrying creating the dataflow plan with optimizers: "
f"{dataflow_plan_optimizer_sets_to_try[i+1]}"
)
assert dataflow_plan is not None
assert convert_to_execution_plan_result is not None
convert_to_execution_plan_result = self._to_execution_plan_converter.convert_to_execution_plan(dataflow_plan)

return MetricFlowExplainResult(
query_spec=query_spec,
Expand Down

0 comments on commit 5603dc7

Please sign in to comment.