diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index d26894d2c4..e1d5b2df59 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -82,7 +82,11 @@ from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer from metricflow.dataset.dataset_classes import DataSet -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PreJoinNodeProcessor +from metricflow.plan_conversion.node_processor import ( + PredicatePushdownParameters, + PredicatePushdownState, + PreJoinNodeProcessor, +) from metricflow.sql.sql_table import SqlTable logger = logging.getLogger(__name__) @@ -237,6 +241,16 @@ def _build_aggregated_conversion_node( constant_properties: Optional[Sequence[ConstantPropertyInput]] = None, ) -> DataflowPlanNode: """Builds a node that contains aggregated values of conversions and opportunities.""" + # Pushdown parameters vary with conversion metrics due to the way the time joins are applied. + # Due to other outstanding issues with conversion metric fitlers, we disable predicate + # pushdown for any filter parameter set that is not part of the original time range constraint + # implementation. + disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() + time_range_only_pushdown_parameters = PredicatePushdownParameters( + time_range_constraint=predicate_pushdown_params.time_range_constraint, + pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY, + ) + # Build measure recipes base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs( queried_linkable_specs=queried_linkable_specs, @@ -244,14 +258,13 @@ def _build_aggregated_conversion_node( ) base_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]), - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, linkable_spec_set=base_required_linkable_specs, ) logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}") conversion_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]), - # TODO - Pushdown: Evaluate the potential for applying time constraints and other predicates for conversion - predicate_pushdown_params=PredicatePushdownParameters(time_range_constraint=None), + predicate_pushdown_params=disabled_pushdown_parameters, linkable_spec_set=LinkableSpecSet(), ) logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}") @@ -268,7 +281,7 @@ def _build_aggregated_conversion_node( aggregated_base_measure_node = self.build_aggregated_measure( metric_input_measure_spec=base_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, ) # Build unaggregated conversions source node @@ -337,10 +350,14 @@ def _build_aggregated_conversion_node( required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs, join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes, ) + # TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model + # In this case we override the measure recipe, which currently results in us bypassing predicate pushdown + # Rather than relying on happenstance in the way the code is laid out we also explicitly disable + # predicate pushdwon until we are ready to fully support it for conversion metrics aggregated_conversions_node = self.build_aggregated_measure( metric_input_measure_spec=conversion_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=disabled_pushdown_parameters, measure_recipe=recipe_with_join_conversion_source_node, ) @@ -492,11 +509,10 @@ def _build_derived_metric_output_node( if not metric_spec.has_time_offset: filter_specs.extend(metric_spec.filter_specs) - # TODO - Pushdown: use parameters to disable pushdown operations instead of clobbering the constraints metric_pushdown_params = ( predicate_pushdown_params if not metric_spec.has_time_offset - else PredicatePushdownParameters(time_range_constraint=None) + else PredicatePushdownParameters.with_pushdown_disabled() ) parent_nodes.append( @@ -867,7 +883,7 @@ def _find_dataflow_recipe( node_data_set_resolver=self._node_data_set_resolver, ) # TODO - Pushdown: Encapsulate this in the node processor - if predicate_pushdown_params.time_range_constraint: + if predicate_pushdown_params.is_pushdown_enabled and predicate_pushdown_params.time_range_constraint: candidate_nodes_for_left_side_of_join = list( node_processor.add_time_range_constraint( source_nodes=candidate_nodes_for_left_side_of_join, @@ -1298,15 +1314,16 @@ def _build_aggregated_measure_from_measure_source_node( + indent(f"\nmeasure_specs:\n{mf_pformat([measure_spec])}") + indent(f"\nevaluation:\n{mf_pformat(required_linkable_specs)}") ) - - # TODO - Pushdown: Update this to be more robust to additional pushdown parameters measure_time_constraint = ( (cumulative_metric_adjusted_time_constraint or predicate_pushdown_params.time_range_constraint) # If joining to time spine for time offset, constraints will be applied after that join. if not before_aggregation_time_spine_join_description else None ) - measure_pushdown_params = PredicatePushdownParameters(time_range_constraint=measure_time_constraint) + measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( + predicate_pushdown_params, time_range_constraint=measure_time_constraint + ) + find_recipe_start_time = time.time() measure_recipe = self._find_dataflow_recipe( measure_spec_properties=measure_properties, diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index e21649e110..4fd46a7741 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -1,9 +1,11 @@ from __future__ import annotations +import dataclasses import logging -from dataclasses import dataclass +from enum import Enum from typing import List, Optional, Sequence, Set +from dbt_semantic_interfaces.enum_extension import assert_values_exhausted from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference from metricflow_semantics.filters.time_constraint import TimeRangeConstraint from metricflow_semantics.mf_logging.pretty_print import mf_pformat @@ -28,7 +30,7 @@ logger = logging.getLogger(__name__) -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidateLineage: """Describes how the multi-hop join candidate was formed. @@ -48,7 +50,7 @@ class MultiHopJoinCandidateLineage: join_second_node_by_entity: LinklessEntitySpec -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidate: """A candidate node containing linkable specs that is join of other nodes. It's used to resolve multi-hop queries. @@ -59,14 +61,111 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -@dataclass(frozen=True) +class PredicatePushdownState(Enum): + """Enumeration of constraint states describing when predicate pushdown may operate in a dataflow plan. + + This is necessary for holistic checks against the set of potentially enabled pushdown operations, because the + scenario where we only allow time range updates requires careful overriding of other pushdown properties. + + Note: the time_range_only state is a backwards compatibility shim for use with conversion metrics while + we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, + but ideally we'd collapse this into a single enabled/disabled boolean property. + """ + + DISABLED = "disabled" + FULLY_ENABLED = "fully_enabled" + ENABLED_FOR_TIME_RANGE_ONLY = "time_range_only" + + +@dataclasses.dataclass(frozen=True) class PredicatePushdownParameters: - """Container class for managing filter predicate pushdown. + """Container class for managing information about whether and how to do filter predicate pushdown. - Stores time constraint information for applying pre-join time filters. + The time_range_constraint property holds the time window for setting up a time range filter expression. """ - time_range_constraint: Optional[TimeRangeConstraint] + _PREDICATE_METADATA_KEY = "is_predicate_property" + + time_range_constraint: Optional[TimeRangeConstraint] = dataclasses.field(metadata={_PREDICATE_METADATA_KEY: True}) + pushdown_state: PredicatePushdownState = PredicatePushdownState.FULLY_ENABLED + + def __post_init__(self) -> None: + """Validation to ensure pushdown properties are configured correctly. + + In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via + outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed + on that particular code branch. + """ + if self.pushdown_state is PredicatePushdownState.FULLY_ENABLED: + return + + invalid_predicate_property_names = { + field.name for field in dataclasses.fields(self) if field.metadata.get(self._PREDICATE_METADATA_KEY) + } + + if self.pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY: + # We don't do validation for time range constraint configuration - having None set in this state is the + # equivalent of disabling predicate pushdown, but that time constraint value might be updated later, + # and so we do not block overrides to (or from) None to avoid meaningless bookkeeping at callsites. + # Also, we keep the magic string name Python uses hidden in here instead of expanding access to it. + invalid_predicate_property_names.remove("time_range_constraint") + + instance_configuration = dataclasses.asdict(self) + invalid_disabled_properties = { + property_name: value + for property_name, value in instance_configuration.items() + if property_name in invalid_predicate_property_names and value is not None + } + + assert not invalid_disabled_properties, ( + "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. This indicates that pushdown is configured for limited scope operations, but " + f"other predicate properties are set to non-None values.\nFull configuration: {instance_configuration}\n" + f"Invalid predicate properties: {invalid_disabled_properties}" + ) + + @property + def is_pushdown_enabled(self) -> bool: + """Convenience accessor for checking that no pushdown constraints exist.""" + pushdown_state = self.pushdown_state + if pushdown_state is PredicatePushdownState.DISABLED: + return False + elif pushdown_state is PredicatePushdownState.FULLY_ENABLED: + return True + elif pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY: + return True + else: + return assert_values_exhausted(pushdown_state) + + @staticmethod + def with_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, time_range_constraint: Optional[TimeRangeConstraint] + ) -> PredicatePushdownParameters: + """Factory method for overriding the time range constraint value in a given set of pushdown parameters. + + This allows for crude updates to the core time range constraint, including selectively disabling time range + predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine + joins and cumulative metrics. + """ + if original_pushdown_params.is_pushdown_enabled: + return PredicatePushdownParameters( + time_range_constraint=time_range_constraint, + ) + else: + return original_pushdown_params + + @staticmethod + def with_pushdown_disabled() -> PredicatePushdownParameters: + """Factory method for disabling predicate pushdown for all parameter types. + + This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the + other may remain eligible. For example, a join linkage where one side of the join contains an unsupported + configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential + for using another path. + """ + return PredicatePushdownParameters(time_range_constraint=None, pushdown_state=PredicatePushdownState.DISABLED) class PreJoinNodeProcessor: diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py new file mode 100644 index 0000000000..397b61a6e2 --- /dev/null +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import dataclasses + +import pytest +from metricflow_semantics.filters.time_constraint import TimeRangeConstraint + +from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PredicatePushdownState + + +@pytest.fixture +def all_pushdown_params() -> PredicatePushdownParameters: + """Tests a valid configuration with all predicate properties set and pushdown fully enabled.""" + params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), pushdown_state=PredicatePushdownState.FULLY_ENABLED + ) + predicate_property_names = { + field.name for field in dataclasses.fields(params) if field.metadata.get(params._PREDICATE_METADATA_KEY) + } + predicate_properties = { + name: value for name, value in dataclasses.asdict(params).items() if name in predicate_property_names + } + assert all(value is not None for value in predicate_properties.values()), ( + "All predicate properties in this pushdown param instance should be set to something other than None. Found " + f"one or more None values in property map: {predicate_properties}" + ) + return params + + +def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdownParameters) -> None: + """Tests pushdown enabled check for time range pushdown operations.""" + time_range_only_params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), + pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY, + ) + + enabled_states = { + "fully enabled": all_pushdown_params.is_pushdown_enabled, + "enabled for time range only": time_range_only_params.is_pushdown_enabled, + } + + assert all(list(enabled_states.values())), ( + "Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, " + f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n" + f"All params: {all_pushdown_params}\nTime range only params: {time_range_only_params}" + ) + + +def test_invalid_disabled_pushdown_params() -> None: + """Tests checks for invalid param configuration on disabled pushdown parameters.""" + with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"): + PredicatePushdownParameters( + pushdown_state=PredicatePushdownState.DISABLED, time_range_constraint=TimeRangeConstraint.all_time() + )