Skip to content

Commit

Permalink
/* PR_START p--misc 05 */ Add retries for SQL generation using lower …
Browse files Browse the repository at this point in the history
…optimization levels.
  • Loading branch information
plypaul committed Dec 13, 2024
1 parent 2b819ee commit 438f228
Showing 1 changed file with 74 additions and 17 deletions.
91 changes: 74 additions & 17 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,81 @@ def convert_to_sql_query_plan(
sql_query_plan_id: Optional[DagId] = None,
) -> ConvertToSqlPlanResult:
"""Create an SQL query plan that represents the computation up to the given dataflow plan node."""
# TODO: Make this a more generally accessible attribute instead of checking against the
# BigQuery-ness of the engine
use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY

option_set = SqlGenerationOptionSet.options_for_level(
optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by
# In case there are bugs that raise exceptions at higher optimization levels, retry generation at a lower
# optimization level. Generally skip O0 (unless requested) as that level does not include the column pruner.
# Without that, the generated SQL can be enormous.
optimization_levels_to_attempt: Sequence[SqlQueryOptimizationLevel] = sorted(
# Union handles case if O0 was specifically requested.
set(
possible_level
for possible_level in SqlQueryOptimizationLevel
if SqlQueryOptimizationLevel.O1 <= possible_level <= optimization_level
).union({optimization_level}),
reverse=True,
)
retried_at_lower_optimization_level = False
logger.debug(
LazyFormat(
"Attempting to convert to a SQL plan with optimization levels:",
optimization_levels_to_attempt=optimization_levels_to_attempt,
)
)
for attempted_optimization_level in optimization_levels_to_attempt:
try:
# TODO: Make this a more generally accessible attribute instead of checking against the
# BigQuery-ness of the engine
use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY

logger.info(LazyFormat("Using option set:", option_set=option_set))
option_set = SqlGenerationOptionSet.options_for_level(
attempted_optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by
)

nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset()
if option_set.allow_cte:
nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node)
logger.info(
LazyFormat("Using option set:", optimization_level=optimization_level, option_set=option_set)
)

return self.convert_using_specifics(
dataflow_plan_node=dataflow_plan_node,
sql_query_plan_id=sql_query_plan_id,
nodes_to_convert_to_cte=nodes_to_convert_to_cte,
optimizers=option_set.optimizers,
)
nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset()
if option_set.allow_cte:
nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node)

result = self.convert_using_specifics(
dataflow_plan_node=dataflow_plan_node,
sql_query_plan_id=sql_query_plan_id,
nodes_to_convert_to_cte=nodes_to_convert_to_cte,
optimizers=option_set.optimizers,
)

if retried_at_lower_optimization_level:
logger.error(
LazyFormat(
"Successfully generated the SQL plan using an optimization level lower than the"
" requested one. A lower one was used due to an exception using the requested one. Please "
"investigate the cause for the exception.",
requested_optimization_level=optimization_level,
successful_optimization_level=attempted_optimization_level,
)
)

return result

except Exception as e:
if optimization_level is optimization_levels_to_attempt[-1]:
logger.error(
"Exhausted attempts to generate the SQL without exceptions."
" Propagating the most recent exception."
)
raise e
retried_at_lower_optimization_level = True
logger.exception(
LazyFormat(
"Got an exception while generating the SQL plan. This indicates a bug that should be"
" investigated, but retrying at a different optimization level to potentially avoid a"
" user-facing error.",
attempted_optimization_level=optimization_level,
)
)

raise RuntimeError("Should have returned a result or raised an exception in the loop.")

def convert_using_specifics(
self,
Expand All @@ -216,7 +271,9 @@ def convert_using_specifics(
optimizers: Sequence[SqlQueryPlanOptimizer],
) -> ConvertToSqlPlanResult:
"""Helper method to convert using specific options. Main use case are tests."""
logger.debug(LazyFormat("Converting to SQL", nodes_to_convert_to_cte=nodes_to_convert_to_cte))
logger.debug(
LazyFormat("Converting to SQL", nodes_to_convert_to_cte=[node.node_id for node in nodes_to_convert_to_cte])
)

if len(nodes_to_convert_to_cte) == 0:
# Avoid `DataflowNodeToSqlCteVisitor` code path for better isolation during rollout.
Expand Down

0 comments on commit 438f228

Please sign in to comment.