From 32707c0a0781467f4bd670c00102efad167b221a Mon Sep 17 00:00:00 2001 From: tlento Date: Tue, 25 Jun 2024 15:03:53 -0700 Subject: [PATCH] Enforce order of application for DataflowPlanOptimizers We want to make sure optimizers are run in the same order from all outside callers, and a subsequent change will broaden our exposure to dbt cloud service callers. Rather than expecting everybody to pick the "right" optimization order, we simply accept a set of optimizations to run and order them internally. --- .../dataflow/builder/dataflow_plan_builder.py | 12 ++++++------ .../optimizer/dataflow_optimizer_factory.py | 19 +++++++++++++------ metricflow/engine/metricflow_engine.py | 2 +- .../query_rendering/compare_rendered_query.py | 6 ++++-- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 70a19a869a..096d63edbf 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -3,7 +3,7 @@ import logging import time from dataclasses import dataclass -from typing import Dict, List, Optional, Sequence, Set, Tuple, Union +from typing import Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, Union from dbt_semantic_interfaces.enum_extension import assert_values_exhausted from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow @@ -146,7 +146,7 @@ def build_plan( query_spec: MetricFlowQuerySpec, output_sql_table: Optional[SqlTable] = None, output_selection_specs: Optional[InstanceSpecSet] = None, - optimizations: Sequence[DataflowPlanOptimization] = (), + optimizations: FrozenSet[DataflowPlanOptimization] = frozenset(), ) -> DataflowPlan: """Generate a plan for reading the results of a query with the given spec into a data_table or table.""" # Workaround for a Pycharm type inspection issue with decorators. @@ -213,7 +213,7 @@ def _build_plan( query_spec: MetricFlowQuerySpec, output_sql_table: Optional[SqlTable], output_selection_specs: Optional[InstanceSpecSet], - optimizations: Sequence[DataflowPlanOptimization], + optimizations: FrozenSet[DataflowPlanOptimization], ) -> DataflowPlan: metrics_output_node = self._build_query_output_node(query_spec=query_spec) @@ -229,7 +229,7 @@ def _build_plan( plan = DataflowPlan(sink_nodes=[sink_node], plan_id=plan_id) return self._optimize_plan(plan, optimizations) - def _optimize_plan(self, plan: DataflowPlan, optimizations: Sequence[DataflowPlanOptimization]) -> DataflowPlan: + def _optimize_plan(self, plan: DataflowPlan, optimizations: FrozenSet[DataflowPlanOptimization]) -> DataflowPlan: optimizer_factory = DataflowPlanOptimizerFactory(self._node_data_set_resolver) for optimizer in optimizer_factory.get_optimizers(optimizations): logger.info(f"Applying {optimizer.__class__.__name__}") @@ -737,7 +737,7 @@ def _build_metrics_output_node( return CombineAggregatedOutputsNode(parent_nodes=output_nodes) def build_plan_for_distinct_values( - self, query_spec: MetricFlowQuerySpec, optimizations: Sequence[DataflowPlanOptimization] = () + self, query_spec: MetricFlowQuerySpec, optimizations: FrozenSet[DataflowPlanOptimization] = frozenset() ) -> DataflowPlan: """Generate a plan that would get the distinct values of a linkable instance. @@ -749,7 +749,7 @@ def build_plan_for_distinct_values( @log_runtime() def _build_plan_for_distinct_values( - self, query_spec: MetricFlowQuerySpec, optimizations: Sequence[DataflowPlanOptimization] + self, query_spec: MetricFlowQuerySpec, optimizations: FrozenSet[DataflowPlanOptimization] ) -> DataflowPlan: assert not query_spec.metric_specs, "Can't build distinct values plan with metrics." query_level_filter_specs: Sequence[WhereFilterSpec] = () diff --git a/metricflow/dataflow/optimizer/dataflow_optimizer_factory.py b/metricflow/dataflow/optimizer/dataflow_optimizer_factory.py index 3d28acd43e..8ccc8d8bba 100644 --- a/metricflow/dataflow/optimizer/dataflow_optimizer_factory.py +++ b/metricflow/dataflow/optimizer/dataflow_optimizer_factory.py @@ -1,7 +1,7 @@ from __future__ import annotations from enum import Enum -from typing import List, Sequence +from typing import FrozenSet, List, Sequence from dbt_semantic_interfaces.enum_extension import assert_values_exhausted @@ -12,10 +12,17 @@ class DataflowPlanOptimization(Enum): - """Enumeration of optimization types available for execution.""" + """Enumeration of optimization types available for execution. - PREDICATE_PUSHDOWN = "predicate_pushdown" - SOURCE_SCAN = "source_scan" + Values indicate order of application. We apply the source scan optimizer first because it reduces input branches, + making for maximally parsimonious queries prior to application of predicate pushdown. Note this is safe only + because the SourceScanOptimizer combines from the CombineAggregatedOutputNode, and will only combine branches + from there to source if they are functionally identical (i.e., they have all of the same WhereConstraintNode + configurations). + """ + + SOURCE_SCAN = 0 + PREDICATE_PUSHDOWN = 1 class DataflowPlanOptimizerFactory: @@ -32,10 +39,10 @@ def __init__(self, node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver """ self._node_data_set_resolver = node_data_set_resolver - def get_optimizers(self, optimizations: Sequence[DataflowPlanOptimization]) -> Sequence[DataflowPlanOptimizer]: + def get_optimizers(self, optimizations: FrozenSet[DataflowPlanOptimization]) -> Sequence[DataflowPlanOptimizer]: """Initializes and returns a sequence of optimizers matching the input optimization requests.""" optimizers: List[DataflowPlanOptimizer] = [] - for optimization in optimizations: + for optimization in sorted(list(optimizations), key=lambda x: x.value): if optimization is DataflowPlanOptimization.SOURCE_SCAN: optimizers.append(SourceScanOptimizer()) elif optimization is DataflowPlanOptimization.PREDICATE_PUSHDOWN: diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index 93a15657d0..ec85ba9278 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -500,7 +500,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me dataflow_plan = self._dataflow_plan_builder.build_plan( query_spec=query_spec, output_selection_specs=output_selection_specs, - optimizations=(DataflowPlanOptimization.SOURCE_SCAN,), + optimizations=frozenset({DataflowPlanOptimization.SOURCE_SCAN}), ) else: dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec) diff --git a/tests_metricflow/query_rendering/compare_rendered_query.py b/tests_metricflow/query_rendering/compare_rendered_query.py index a5d37c7348..d13085a928 100644 --- a/tests_metricflow/query_rendering/compare_rendered_query.py +++ b/tests_metricflow/query_rendering/compare_rendered_query.py @@ -57,9 +57,11 @@ def render_and_check( DataflowPlanOptimization.PREDICATE_PUSHDOWN, ) if is_distinct_values_plan: - optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values(query_spec, optimizations=optimizations) + optimized_plan = dataflow_plan_builder.build_plan_for_distinct_values( + query_spec, optimizations=frozenset(optimizations) + ) else: - optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=optimizations) + optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizations=frozenset(optimizations)) conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan( sql_engine_type=sql_client.sql_engine_type, dataflow_plan_node=optimized_plan.sink_node,