diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 7e6b7a3bfe..d5e1b71b24 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -83,9 +83,9 @@ from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer from metricflow.dataset.dataset_classes import DataSet from metricflow.plan_conversion.node_processor import ( + PredicateInputType, PredicatePushdownParameters, PreJoinNodeProcessor, - PushdownPredicateInputType, ) from metricflow.sql.sql_table import SqlTable @@ -248,7 +248,7 @@ def _build_aggregated_conversion_node( disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() time_range_only_pushdown_parameters = PredicatePushdownParameters( time_range_constraint=predicate_pushdown_params.time_range_constraint, - pushdown_enabled_types=frozenset([PushdownPredicateInputType.TIME_RANGE_CONSTRAINT]), + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), ) # Build measure recipes diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index b664e1ad2d..6463f7d3cf 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -61,15 +61,15 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -class PushdownPredicateInputType(Enum): - """Enumeration of constraint states describing when predicate pushdown may operate in a dataflow plan. +class PredicateInputType(Enum): + """Enumeration of predicate input types we may encounter in where filters. - This is necessary for holistic checks against the set of potentially enabled pushdown operations, because the - scenario where we only allow time range updates requires careful overriding of other pushdown properties. + This is primarily used for describing when predicate pushdown may operate in a dataflow plan, and is necessary + for holistic checks against the set of potentially enabled pushdown operations. For example, in the scenario + scenario where we only allow time range updates, we must do careful overriding of other pushdown properties. - Note: the time_range_only state is a backwards compatibility shim for use with conversion metrics while - we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, - but ideally we'd collapse this into a single enabled/disabled boolean property. + This also allows us to disable pushdown for things like time dimension filters in cases where we might + accidental censor input data. """ CATEGORICAL_DIMENSION = "categorical_dimension" @@ -86,9 +86,7 @@ class PredicatePushdownParameters: """ time_range_constraint: Optional[TimeRangeConstraint] - pushdown_enabled_types: FrozenSet[PushdownPredicateInputType] = frozenset( - [PushdownPredicateInputType.TIME_RANGE_CONSTRAINT] - ) + pushdown_enabled_types: FrozenSet[PredicateInputType] = frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]) def __post_init__(self) -> None: """Validation to ensure pushdown properties are configured correctly. @@ -97,16 +95,16 @@ def __post_init__(self) -> None: outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed on that particular code branch. It also asserts that unsupported pushdown scenarios are not configured. """ - invalid_types: Set[PushdownPredicateInputType] = set() + invalid_types: Set[PredicateInputType] = set() for input_type in self.pushdown_enabled_types: if ( - input_type is PushdownPredicateInputType.CATEGORICAL_DIMENSION - or input_type is PushdownPredicateInputType.ENTITY - or input_type is PushdownPredicateInputType.TIME_DIMENSION + input_type is PredicateInputType.CATEGORICAL_DIMENSION + or input_type is PredicateInputType.ENTITY + or input_type is PredicateInputType.TIME_DIMENSION ): invalid_types.add(input_type) - elif input_type is PushdownPredicateInputType.TIME_RANGE_CONSTRAINT: + elif input_type is PredicateInputType.TIME_RANGE_CONSTRAINT: continue else: assert_values_exhausted(input_type) @@ -135,8 +133,13 @@ def is_pushdown_disabled(self) -> bool: @property def is_pushdown_enabled_for_time_range_constraint(self) -> bool: - """Convenience accessor for checking if pushdown is enabled for time range constraints.""" - return PushdownPredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + """Convenience accessor for checking if pushdown is enabled for time range constraints. + + Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while + we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, + but ideally we'd collapse this with the more general time dimension filter input scenarios. + """ + return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types @staticmethod def with_time_range_constraint( @@ -148,7 +151,7 @@ def with_time_range_constraint( range constraint filter if one becomes available mid-stream during dataflow plan construction. """ pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union( - {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + {PredicateInputType.TIME_RANGE_CONSTRAINT} ) return PredicatePushdownParameters( time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types @@ -160,7 +163,7 @@ def without_time_range_constraint( ) -> PredicatePushdownParameters: """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference( - {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + {PredicateInputType.TIME_RANGE_CONSTRAINT} ) return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types) diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py index cf45393816..7493745f17 100644 --- a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -3,7 +3,7 @@ import pytest from metricflow_semantics.filters.time_constraint import TimeRangeConstraint -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PushdownPredicateInputType +from metricflow.plan_conversion.node_processor import PredicateInputType, PredicatePushdownParameters @pytest.fixture @@ -19,7 +19,7 @@ def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdo """Tests pushdown enabled check for time range pushdown operations.""" time_range_only_params = PredicatePushdownParameters( time_range_constraint=TimeRangeConstraint.all_time(), - pushdown_enabled_types=frozenset([PushdownPredicateInputType.TIME_RANGE_CONSTRAINT]), + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), ) enabled_states = {