Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce order of application for DataflowPlanOptimizers #1307

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from typing import Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, Union

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow
Expand Down Expand Up @@ -146,7 +146,7 @@ def build_plan(
query_spec: MetricFlowQuerySpec,
output_sql_table: Optional[SqlTable] = None,
output_selection_specs: Optional[InstanceSpecSet] = None,
optimizations: Sequence[DataflowPlanOptimization] = (),
optimizations: FrozenSet[DataflowPlanOptimization] = frozenset(),
) -> DataflowPlan:
"""Generate a plan for reading the results of a query with the given spec into a data_table or table."""
# Workaround for a Pycharm type inspection issue with decorators.
Expand Down Expand Up @@ -213,7 +213,7 @@ def _build_plan(
query_spec: MetricFlowQuerySpec,
output_sql_table: Optional[SqlTable],
output_selection_specs: Optional[InstanceSpecSet],
optimizations: Sequence[DataflowPlanOptimization],
optimizations: FrozenSet[DataflowPlanOptimization],
) -> DataflowPlan:
metrics_output_node = self._build_query_output_node(query_spec=query_spec)

Expand All @@ -229,7 +229,7 @@ def _build_plan(
plan = DataflowPlan(sink_nodes=[sink_node], plan_id=plan_id)
return self._optimize_plan(plan, optimizations)

def _optimize_plan(self, plan: DataflowPlan, optimizations: Sequence[DataflowPlanOptimization]) -> DataflowPlan:
def _optimize_plan(self, plan: DataflowPlan, optimizations: FrozenSet[DataflowPlanOptimization]) -> DataflowPlan:
optimizer_factory = DataflowPlanOptimizerFactory(self._node_data_set_resolver)
for optimizer in optimizer_factory.get_optimizers(optimizations):
logger.info(f"Applying {optimizer.__class__.__name__}")
Expand Down Expand Up @@ -737,7 +737,7 @@ def _build_metrics_output_node(
return CombineAggregatedOutputsNode(parent_nodes=output_nodes)

def build_plan_for_distinct_values(
self, query_spec: MetricFlowQuerySpec, optimizations: Sequence[DataflowPlanOptimization] = ()
self, query_spec: MetricFlowQuerySpec, optimizations: FrozenSet[DataflowPlanOptimization] = frozenset()
) -> DataflowPlan:
"""Generate a plan that would get the distinct values of a linkable instance.

Expand All @@ -749,7 +749,7 @@ def build_plan_for_distinct_values(

@log_runtime()
def _build_plan_for_distinct_values(
self, query_spec: MetricFlowQuerySpec, optimizations: Sequence[DataflowPlanOptimization]
self, query_spec: MetricFlowQuerySpec, optimizations: FrozenSet[DataflowPlanOptimization]
) -> DataflowPlan:
assert not query_spec.metric_specs, "Can't build distinct values plan with metrics."
query_level_filter_specs: Sequence[WhereFilterSpec] = ()
Expand Down
19 changes: 13 additions & 6 deletions metricflow/dataflow/optimizer/dataflow_optimizer_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from enum import Enum
from typing import List, Sequence
from typing import FrozenSet, List, Sequence

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted

Expand All @@ -12,10 +12,17 @@


class DataflowPlanOptimization(Enum):
"""Enumeration of optimization types available for execution."""
"""Enumeration of optimization types available for execution.

PREDICATE_PUSHDOWN = "predicate_pushdown"
SOURCE_SCAN = "source_scan"
Values indicate order of application. We apply the source scan optimizer first because it reduces input branches,
making for maximally parsimonious queries prior to application of predicate pushdown. Note this is safe only
because the SourceScanOptimizer combines from the CombineAggregatedOutputNode, and will only combine branches
from there to source if they are functionally identical (i.e., they have all of the same WhereConstraintNode
configurations).
"""

SOURCE_SCAN = 0
PREDICATE_PUSHDOWN = 1


class DataflowPlanOptimizerFactory:
Expand All @@ -32,10 +39,10 @@ def __init__(self, node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver
"""
self._node_data_set_resolver = node_data_set_resolver

def get_optimizers(self, optimizations: Sequence[DataflowPlanOptimization]) -> Sequence[DataflowPlanOptimizer]:
def get_optimizers(self, optimizations: FrozenSet[DataflowPlanOptimization]) -> Sequence[DataflowPlanOptimizer]:
"""Initializes and returns a sequence of optimizers matching the input optimization requests."""
optimizers: List[DataflowPlanOptimizer] = []
for optimization in optimizations:
for optimization in sorted(list(optimizations), key=lambda x: x.value):
if optimization is DataflowPlanOptimization.SOURCE_SCAN:
optimizers.append(SourceScanOptimizer())
elif optimization is DataflowPlanOptimization.PREDICATE_PUSHDOWN:
Expand Down
2 changes: 1 addition & 1 deletion metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
dataflow_plan = self._dataflow_plan_builder.build_plan(
query_spec=query_spec,
output_selection_specs=output_selection_specs,
optimizations=(DataflowPlanOptimization.SOURCE_SCAN,),
optimizations=frozenset({DataflowPlanOptimization.SOURCE_SCAN}),
)
else:
dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec)
Expand Down
6 changes: 4 additions & 2 deletions tests_metricflow/query_rendering/compare_rendered_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ def render_and_check(
DataflowPlanOptimization.PREDICATE_PUSHDOWN,
)
if is_distinct_values_plan:
optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values(query_spec, optimizations=optimizations)
optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values(
query_spec, optimizations=frozenset(optimizations)
)
else:
optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=optimizations)
optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=frozenset(optimizations))
conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=optimized_plan.sink_node,
Expand Down
Loading