diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index f6db45185a..c2ab75a641 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -245,8 +245,8 @@ def _build_aggregated_conversion_node( # Due to other outstanding issues with conversion metric filters, we disable predicate # pushdown for any filter parameter set that is not part of the original time range constraint # implementation. - disabled_pushdown_parameters = PredicatePushdownState.with_pushdown_disabled() - time_range_only_pushdown_parameters = PredicatePushdownState( + disabled_pushdown_state = PredicatePushdownState.with_pushdown_disabled() + time_range_only_pushdown_state = PredicatePushdownState( time_range_constraint=predicate_pushdown_state.time_range_constraint, pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), ) @@ -258,13 +258,13 @@ def _build_aggregated_conversion_node( ) base_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]), - predicate_pushdown_state=time_range_only_pushdown_parameters, + predicate_pushdown_state=time_range_only_pushdown_state, linkable_spec_set=base_required_linkable_specs, ) logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}") conversion_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]), - predicate_pushdown_state=disabled_pushdown_parameters, + predicate_pushdown_state=disabled_pushdown_state, linkable_spec_set=LinkableSpecSet(), ) logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}") @@ -281,7 +281,7 @@ def _build_aggregated_conversion_node( aggregated_base_measure_node = self.build_aggregated_measure( metric_input_measure_spec=base_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_state=time_range_only_pushdown_parameters, + predicate_pushdown_state=time_range_only_pushdown_state, ) # Build unaggregated conversions source node @@ -357,7 +357,7 @@ def _build_aggregated_conversion_node( aggregated_conversions_node = self.build_aggregated_measure( metric_input_measure_spec=conversion_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_state=disabled_pushdown_parameters, + predicate_pushdown_state=disabled_pushdown_state, measure_recipe=recipe_with_join_conversion_source_node, ) diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 6cecd50021..da471f4ff0 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -69,7 +69,7 @@ class PredicateInputType(Enum): scenario where we only allow time range updates, we must do careful overriding of other pushdown properties. This also allows us to disable pushdown for things like time dimension filters in cases where we might - accidental censor input data. + accidentally censor input data. """ CATEGORICAL_DIMENSION = "categorical_dimension" @@ -80,7 +80,16 @@ class PredicateInputType(Enum): @dataclasses.dataclass(frozen=True) class PredicatePushdownState: - """Container class for managing information about whether and how to do filter predicate pushdown. + """Container class for maintaining state information relevant for predicate pushdown. + + This broadly tracks two related items: + 1. Filter predicates collected during the process of constructing a dataflow plan + 2. Predicate types eligible for pushdown + + The former may be updated as things like time constraints get altered or metric and measure filters are + added to the query filters. + The latter may be updated based on query configuration, like if a cumulative metric is added to the plan + there may be changes to what sort of predicate pushdown operations are supported. The time_range_constraint property holds the time window for setting up a time range filter expression. """ @@ -117,9 +126,9 @@ def __post_init__(self) -> None: ) if self.is_pushdown_disabled: - # TODO: Include where filter specs when they are added to this class + # TODO: Include where filter specs when they are added to this class and check state -> predicate pairs assert self.time_range_constraint is None, ( - "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " + "Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties " "set that may lead to improper access and use in other contexts, as that can lead to unintended " "filtering operations in cases where these properties are accessed without appropriate checks against " "pushdown configuration. The following properties should all have None values:\n" @@ -145,7 +154,7 @@ def is_pushdown_enabled_for_time_range_constraint(self) -> bool: def with_time_range_constraint( original_pushdown_state: PredicatePushdownState, time_range_constraint: TimeRangeConstraint ) -> PredicatePushdownState: - """Factory method for adding or updating a time range constraint input to a set of pushdown parameters. + """Factory method for updating a pushdown state with a time range constraint. This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time range constraint filter if one becomes available mid-stream during dataflow plan construction. @@ -161,7 +170,7 @@ def with_time_range_constraint( def without_time_range_constraint( original_pushdown_state: PredicatePushdownState, ) -> PredicatePushdownState: - """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" + """Factory method for updating pushdown state to bypass time range constraints.""" pushdown_enabled_types = original_pushdown_state.pushdown_enabled_types.difference( {PredicateInputType.TIME_RANGE_CONSTRAINT} ) @@ -169,7 +178,7 @@ def without_time_range_constraint( @staticmethod def with_pushdown_disabled() -> PredicatePushdownState: - """Factory method for disabling predicate pushdown for all parameter types. + """Factory method for configuring a disabled predicate pushdown state. This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the other may remain eligible. For example, a join linkage where one side of the join contains an unsupported diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py index cf2a2cf991..8969e49384 100644 --- a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -7,7 +7,7 @@ @pytest.fixture -def all_pushdown_state() -> PredicatePushdownState: +def fully_enabled_pushdown_state() -> PredicatePushdownState: """Tests a valid configuration with all predicate properties set and pushdown fully enabled.""" params = PredicatePushdownState( time_range_constraint=TimeRangeConstraint.all_time(), @@ -15,26 +15,28 @@ def all_pushdown_state() -> PredicatePushdownState: return params -def test_time_range_pushdown_enabled_states(all_pushdown_state: PredicatePushdownState) -> None: +def test_time_range_pushdown_enabled_states(fully_enabled_pushdown_state: PredicatePushdownState) -> None: """Tests pushdown enabled check for time range pushdown operations.""" - time_range_only_params = PredicatePushdownState( + time_range_only_state = PredicatePushdownState( time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), ) enabled_states = { - "fully enabled": all_pushdown_state.is_pushdown_enabled_for_time_range_constraint, - "enabled for time range only": time_range_only_params.is_pushdown_enabled_for_time_range_constraint, + "fully enabled": fully_enabled_pushdown_state.is_pushdown_enabled_for_time_range_constraint, + "enabled for time range only": time_range_only_state.is_pushdown_enabled_for_time_range_constraint, } assert all(list(enabled_states.values())), ( - "Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, " - f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n" - f"All params: {all_pushdown_state}\nTime range only params: {time_range_only_params}" + "Expected pushdown to be enabled for pushdown state with time range constraint and global pushdown enabled, " + "but some states returned False for is_pushdown_enabled_for_time_range_constraints.\n" + f"Pushdown enabled states: {enabled_states}\n" + f"Fully enabled state: {fully_enabled_pushdown_state}\n" + f"Time range only state: {time_range_only_state}" ) def test_invalid_disabled_pushdown_state() -> None: """Tests checks for invalid param configuration on disabled pushdown parameters.""" - with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"): + with pytest.raises(AssertionError, match="Disabled pushdown state objects cannot have properties set"): PredicatePushdownState(time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset())