diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 18bff46af..e63b8d1e6 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -187,26 +187,81 @@ def convert_to_sql_query_plan( sql_query_plan_id: Optional[DagId] = None, ) -> ConvertToSqlPlanResult: """Create an SQL query plan that represents the computation up to the given dataflow plan node.""" - # TODO: Make this a more generally accessible attribute instead of checking against the - # BigQuery-ness of the engine - use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY - - option_set = SqlGenerationOptionSet.options_for_level( - optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by + # In case there are bugs that raise exceptions at higher optimization levels, retry generation at a lower + # optimization level. Generally skip O0 (unless requested) as that level does not include the column pruner. + # Without that, the generated SQL can be enormous. + optimization_levels_to_attempt: Sequence[SqlQueryOptimizationLevel] = sorted( + # Union handles case if O0 was specifically requested. + set( + possible_level + for possible_level in SqlQueryOptimizationLevel + if SqlQueryOptimizationLevel.O1 <= possible_level <= optimization_level + ).union({optimization_level}), + reverse=True, + ) + retried_at_lower_optimization_level = False + logger.debug( + LazyFormat( + "Attempting, in order,with optimization levels:", + optimization_levels_to_attempt=optimization_levels_to_attempt, + ) ) + for attempted_optimization_level in optimization_levels_to_attempt: + try: + # TODO: Make this a more generally accessible attribute instead of checking against the + # BigQuery-ness of the engine + use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY - logger.info(LazyFormat("Using option set:", option_set=option_set)) + option_set = SqlGenerationOptionSet.options_for_level( + attempted_optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by + ) - nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset() - if option_set.allow_cte: - nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node) + logger.info( + LazyFormat("Using option set:", optimization_level=optimization_level, option_set=option_set) + ) - return self.convert_using_specifics( - dataflow_plan_node=dataflow_plan_node, - sql_query_plan_id=sql_query_plan_id, - nodes_to_convert_to_cte=nodes_to_convert_to_cte, - optimizers=option_set.optimizers, - ) + nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset() + if option_set.allow_cte: + nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node) + + result = self.convert_using_specifics( + dataflow_plan_node=dataflow_plan_node, + sql_query_plan_id=sql_query_plan_id, + nodes_to_convert_to_cte=nodes_to_convert_to_cte, + optimizers=option_set.optimizers, + ) + + if retried_at_lower_optimization_level: + logger.error( + LazyFormat( + "Successfully generated the SQL plan using an optimization level lower than the" + " requested one. A lower one was used due to an exception using the requested one. Please " + "investigate the cause for the exception.", + requested_optimization_level=optimization_level, + successful_optimization_level=attempted_optimization_level, + ) + ) + + return result + + except Exception as e: + if optimization_level is optimization_levels_to_attempt[-1]: + logger.error( + "Exhausted attempts to generate the SQL without exceptions." + " Propagating the most recent exception." + ) + raise e + retried_at_lower_optimization_level = True + logger.exception( + LazyFormat( + "Got an exception while generating the SQL plan. This indicates a bug that should be" + " investigated, but retrying at a different optimization level to potentially avoid a" + " user-facing error.", + attempted_optimization_level=optimization_level, + ) + ) + + raise RuntimeError("Should have returned a result or raised an exception in the loop.") def convert_using_specifics( self, @@ -216,7 +271,9 @@ def convert_using_specifics( optimizers: Sequence[SqlQueryPlanOptimizer], ) -> ConvertToSqlPlanResult: """Helper method to convert using specific options. Main use case are tests.""" - logger.debug(LazyFormat("Converting to SQL", nodes_to_convert_to_cte=nodes_to_convert_to_cte)) + logger.debug( + LazyFormat("Converting to SQL", nodes_to_convert_to_cte=[node.node_id for node in nodes_to_convert_to_cte]) + ) if len(nodes_to_convert_to_cte) == 0: # Avoid `DataflowNodeToSqlCteVisitor` code path for better isolation during rollout.