Skip to content

Commit

Permalink
Enable PredicatePushdownOptimization for all MetricFlowEngine queries
Browse files Browse the repository at this point in the history
This effectively releases PredicatePushdownOptimization - the moment
this change is deployed to cloud it will be enabled.

In order to allow for a rapid mitigation of any unexpected issues this
also parameterizes the query request object to allow callers to
disable optimizations as needed. This means cloud services calling
this method can override the optimizer behaviors without requiring
an update to MetricFlow.
  • Loading branch information
tlento committed Jun 25, 2024
1 parent 505a2e4 commit 7ab57a4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240625-152952.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable predicate pushdown optimization by default for all callers
time: 2024-06-25T15:29:52.514224-07:00
custom:
Author: tlento
Issue: "1011"
5 changes: 5 additions & 0 deletions metricflow/dataflow/optimizer/dataflow_optimizer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class DataflowPlanOptimization(Enum):
SOURCE_SCAN = 0
PREDICATE_PUSHDOWN = 1

@staticmethod
def all_optimizations() -> FrozenSet[DataflowPlanOptimization]:
"""Convenience method for getting a set of all available optimizations."""
return frozenset((DataflowPlanOptimization.SOURCE_SCAN, DataflowPlanOptimization.PREDICATE_PUSHDOWN))


class DataflowPlanOptimizerFactory:
"""Factory class for initializing an enumerated set of optimizers.
Expand Down
11 changes: 8 additions & 3 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Sequence, Tuple
from typing import FrozenSet, List, Optional, Sequence, Tuple

from dbt_semantic_interfaces.implementations.elements.dimension import PydanticDimensionTypeParams
from dbt_semantic_interfaces.implementations.filters.where_filter import PydanticWhereFilter
Expand Down Expand Up @@ -113,6 +113,7 @@ class MetricFlowQueryRequest:
order_by: Optional[Sequence[OrderByQueryParameter]] = None
min_max_only: bool = False
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4
dataflow_plan_optimizations: FrozenSet[DataflowPlanOptimization] = DataflowPlanOptimization.all_optimizations()
query_type: MetricFlowQueryType = MetricFlowQueryType.METRIC

@staticmethod
Expand All @@ -129,6 +130,7 @@ def create_with_random_request_id( # noqa: D102
order_by_names: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[OrderByQueryParameter]] = None,
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4,
dataflow_plan_optimizations: FrozenSet[DataflowPlanOptimization] = DataflowPlanOptimization.all_optimizations(),
query_type: MetricFlowQueryType = MetricFlowQueryType.METRIC,
min_max_only: bool = False,
) -> MetricFlowQueryRequest:
Expand All @@ -146,6 +148,7 @@ def create_with_random_request_id( # noqa: D102
order_by_names=order_by_names,
order_by=order_by,
sql_optimization_level=sql_optimization_level,
dataflow_plan_optimizations=dataflow_plan_optimizations,
query_type=query_type,
min_max_only=min_max_only,
)
Expand Down Expand Up @@ -500,10 +503,12 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
dataflow_plan = self._dataflow_plan_builder.build_plan(
query_spec=query_spec,
output_selection_specs=output_selection_specs,
optimizations=frozenset({DataflowPlanOptimization.SOURCE_SCAN}),
optimizations=mf_query_request.dataflow_plan_optimizations,
)
else:
dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec)
dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(
query_spec=query_spec, optimizations=mf_query_request.dataflow_plan_optimizations
)

if len(dataflow_plan.sink_nodes) > 1:
raise NotImplementedError(
Expand Down
10 changes: 4 additions & 6 deletions tests_metricflow/query_rendering/compare_rendered_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,14 @@ def render_and_check(
)

# Run dataflow -> sql conversion with all optimizers
optimizations = (
DataflowPlanOptimization.SOURCE_SCAN,
DataflowPlanOptimization.PREDICATE_PUSHDOWN,
)
if is_distinct_values_plan:
optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values(
query_spec, optimizations=frozenset(optimizations)
query_spec, optimizations=DataflowPlanOptimization.all_optimizations()
)
else:
optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=frozenset(optimizations))
optimized_plan = dataflow_plan_builder.build_plan(
query_spec, optimizations=DataflowPlanOptimization.all_optimizations()
)
conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=optimized_plan.sink_node,
Expand Down

0 comments on commit 7ab57a4

Please sign in to comment.