Skip to content

Commit

Permalink
Enforce order of application for DataflowPlanOptimizers (#1307)
Browse files Browse the repository at this point in the history
We want to make sure optimizers are run in the same order from
all outside callers, and a subsequent change will broaden our
exposure to dbt cloud service callers. Rather than expecting
everybody to pick the "right" optimization order, we simply
accept a set of optimizations to run and order them internally.
  • Loading branch information
tlento authored Jun 26, 2024
1 parent 3fb97c8 commit 1e97bf0
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 15 deletions.
12 changes: 6 additions & 6 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from typing import Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, Union

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow
Expand Down Expand Up @@ -146,7 +146,7 @@ def build_plan(
query_spec: MetricFlowQuerySpec,
output_sql_table: Optional[SqlTable] = None,
output_selection_specs: Optional[InstanceSpecSet] = None,
optimizations: Sequence[DataflowPlanOptimization] = (),
optimizations: FrozenSet[DataflowPlanOptimization] = frozenset(),
) -> DataflowPlan:
"""Generate a plan for reading the results of a query with the given spec into a data_table or table."""
# Workaround for a Pycharm type inspection issue with decorators.
Expand Down Expand Up @@ -213,7 +213,7 @@ def _build_plan(
query_spec: MetricFlowQuerySpec,
output_sql_table: Optional[SqlTable],
output_selection_specs: Optional[InstanceSpecSet],
optimizations: Sequence[DataflowPlanOptimization],
optimizations: FrozenSet[DataflowPlanOptimization],
) -> DataflowPlan:
metrics_output_node = self._build_query_output_node(query_spec=query_spec)

Expand All @@ -229,7 +229,7 @@ def _build_plan(
plan = DataflowPlan(sink_nodes=[sink_node], plan_id=plan_id)
return self._optimize_plan(plan, optimizations)

def _optimize_plan(self, plan: DataflowPlan, optimizations: Sequence[DataflowPlanOptimization]) -> DataflowPlan:
def _optimize_plan(self, plan: DataflowPlan, optimizations: FrozenSet[DataflowPlanOptimization]) -> DataflowPlan:
optimizer_factory = DataflowPlanOptimizerFactory(self._node_data_set_resolver)
for optimizer in optimizer_factory.get_optimizers(optimizations):
logger.info(f"Applying {optimizer.__class__.__name__}")
Expand Down Expand Up @@ -737,7 +737,7 @@ def _build_metrics_output_node(
return CombineAggregatedOutputsNode(parent_nodes=output_nodes)

def build_plan_for_distinct_values(
self, query_spec: MetricFlowQuerySpec, optimizations: Sequence[DataflowPlanOptimization] = ()
self, query_spec: MetricFlowQuerySpec, optimizations: FrozenSet[DataflowPlanOptimization] = frozenset()
) -> DataflowPlan:
"""Generate a plan that would get the distinct values of a linkable instance.
Expand All @@ -749,7 +749,7 @@ def build_plan_for_distinct_values(

@log_runtime()
def _build_plan_for_distinct_values(
self, query_spec: MetricFlowQuerySpec, optimizations: Sequence[DataflowPlanOptimization]
self, query_spec: MetricFlowQuerySpec, optimizations: FrozenSet[DataflowPlanOptimization]
) -> DataflowPlan:
assert not query_spec.metric_specs, "Can't build distinct values plan with metrics."
query_level_filter_specs: Sequence[WhereFilterSpec] = ()
Expand Down
19 changes: 13 additions & 6 deletions metricflow/dataflow/optimizer/dataflow_optimizer_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from enum import Enum
from typing import List, Sequence
from typing import FrozenSet, List, Sequence

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted

Expand All @@ -12,10 +12,17 @@


class DataflowPlanOptimization(Enum):
"""Enumeration of optimization types available for execution."""
"""Enumeration of optimization types available for execution.
PREDICATE_PUSHDOWN = "predicate_pushdown"
SOURCE_SCAN = "source_scan"
Values indicate order of application. We apply the source scan optimizer first because it reduces input branches,
making for maximally parsimonious queries prior to application of predicate pushdown. Note this is safe only
because the SourceScanOptimizer combines from the CombineAggregatedOutputNode, and will only combine branches
from there to source if they are functionally identical (i.e., they have all of the same WhereConstraintNode
configurations).
"""

SOURCE_SCAN = 0
PREDICATE_PUSHDOWN = 1


class DataflowPlanOptimizerFactory:
Expand All @@ -32,10 +39,10 @@ def __init__(self, node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver
"""
self._node_data_set_resolver = node_data_set_resolver

def get_optimizers(self, optimizations: Sequence[DataflowPlanOptimization]) -> Sequence[DataflowPlanOptimizer]:
def get_optimizers(self, optimizations: FrozenSet[DataflowPlanOptimization]) -> Sequence[DataflowPlanOptimizer]:
"""Initializes and returns a sequence of optimizers matching the input optimization requests."""
optimizers: List[DataflowPlanOptimizer] = []
for optimization in optimizations:
for optimization in sorted(list(optimizations), key=lambda x: x.value):
if optimization is DataflowPlanOptimization.SOURCE_SCAN:
optimizers.append(SourceScanOptimizer())
elif optimization is DataflowPlanOptimization.PREDICATE_PUSHDOWN:
Expand Down
2 changes: 1 addition & 1 deletion metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
dataflow_plan = self._dataflow_plan_builder.build_plan(
query_spec=query_spec,
output_selection_specs=output_selection_specs,
optimizations=(DataflowPlanOptimization.SOURCE_SCAN,),
optimizations=frozenset({DataflowPlanOptimization.SOURCE_SCAN}),
)
else:
dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec)
Expand Down
6 changes: 4 additions & 2 deletions tests_metricflow/query_rendering/compare_rendered_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ def render_and_check(
DataflowPlanOptimization.PREDICATE_PUSHDOWN,
)
if is_distinct_values_plan:
optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values(query_spec, optimizations=optimizations)
optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values(
query_spec, optimizations=frozenset(optimizations)
)
else:
optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=optimizations)
optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=frozenset(optimizations))
conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=optimized_plan.sink_node,
Expand Down

0 comments on commit 1e97bf0

Please sign in to comment.