From d4bb59c6cc01e2172b60bdad9616dfb3025264b0 Mon Sep 17 00:00:00 2001 From: Thomas Lento Date: Thu, 23 May 2024 17:46:43 -0700 Subject: [PATCH] Use pushdown params to disable time range constraint pushdown (#1216) The predicate pushdown operations for time range constriants currently rely on None/not-None state to determine whether or not to push the ConstrainTimeRange filter node to the source. With the switch to pushdown params this None/not-None state gets tenuous, as future updates will need to do things like manage time range constraints, other time dimension filters, and categorical dimension (and entity) filters, and relying on externalizing None/not-None combinations for these various filter types will be challenging. This change encapsulates the enabling and disabling of time range constraints inside of PredicatePushdownParams. At the moment, in order to maintain the existing behavior, we simply internalize the None/not-None behavior for time range constraints. This will also allow us to easily retain pushdown processing for categorical dimensions even when time constraint filters should not be applied, and gradually centralize those controls as we streamline the callsites. There is added complexity to this change because of two things. 1. Time constraint updating is scattered around in the DataflowPlanBuilder 2. Conversion metrics currently do predicate pushdown for time constraints The first of these will be addressed later. Since we cannot reliably support general predicate pushdown for conversion metrics, we need to allow for a time-range-only pushdown operation. Today this is equivalent to pushdown enabled, but that will change shortly. --- .../dataflow/builder/dataflow_plan_builder.py | 49 +++++-- metricflow/plan_conversion/node_processor.py | 127 +++++++++++++++++- .../builder/test_predicate_pushdown.py | 42 ++++++ 3 files changed, 199 insertions(+), 19 deletions(-) create mode 100644 tests_metricflow/dataflow/builder/test_predicate_pushdown.py diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 1177aa43e0..89de8b48d5 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -82,7 +82,11 @@ from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer from metricflow.dataset.dataset_classes import DataSet -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PreJoinNodeProcessor +from metricflow.plan_conversion.node_processor import ( + PredicateInputType, + PredicatePushdownParameters, + PreJoinNodeProcessor, +) from metricflow.sql.sql_table import SqlTable logger = logging.getLogger(__name__) @@ -237,6 +241,16 @@ def _build_aggregated_conversion_node( constant_properties: Optional[Sequence[ConstantPropertyInput]] = None, ) -> DataflowPlanNode: """Builds a node that contains aggregated values of conversions and opportunities.""" + # Pushdown parameters vary with conversion metrics due to the way the time joins are applied. + # Due to other outstanding issues with conversion metric filters, we disable predicate + # pushdown for any filter parameter set that is not part of the original time range constraint + # implementation. + disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() + time_range_only_pushdown_parameters = PredicatePushdownParameters( + time_range_constraint=predicate_pushdown_params.time_range_constraint, + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), + ) + # Build measure recipes base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs( queried_linkable_specs=queried_linkable_specs, @@ -244,14 +258,13 @@ def _build_aggregated_conversion_node( ) base_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]), - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, linkable_spec_set=base_required_linkable_specs, ) logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}") conversion_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]), - # TODO - Pushdown: Evaluate the potential for applying time constraints and other predicates for conversion - predicate_pushdown_params=PredicatePushdownParameters(time_range_constraint=None), + predicate_pushdown_params=disabled_pushdown_parameters, linkable_spec_set=LinkableSpecSet(), ) logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}") @@ -268,7 +281,7 @@ def _build_aggregated_conversion_node( aggregated_base_measure_node = self.build_aggregated_measure( metric_input_measure_spec=base_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, ) # Build unaggregated conversions source node @@ -337,10 +350,14 @@ def _build_aggregated_conversion_node( required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs, join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes, ) + # TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model + # In this case we override the measure recipe, which currently results in us bypassing predicate pushdown + # Rather than relying on happenstance in the way the code is laid out we also explicitly disable + # predicate pushdwon until we are ready to fully support it for conversion metrics aggregated_conversions_node = self.build_aggregated_measure( metric_input_measure_spec=conversion_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=disabled_pushdown_parameters, measure_recipe=recipe_with_join_conversion_source_node, ) @@ -492,11 +509,10 @@ def _build_derived_metric_output_node( if not metric_spec.has_time_offset: filter_specs.extend(metric_spec.filter_specs) - # TODO - Pushdown: use parameters to disable pushdown operations instead of clobbering the constraints metric_pushdown_params = ( predicate_pushdown_params if not metric_spec.has_time_offset - else PredicatePushdownParameters(time_range_constraint=None) + else PredicatePushdownParameters.with_pushdown_disabled() ) parent_nodes.append( @@ -867,7 +883,10 @@ def _find_dataflow_recipe( node_data_set_resolver=self._node_data_set_resolver, ) # TODO - Pushdown: Encapsulate this in the node processor - if predicate_pushdown_params.time_range_constraint: + if ( + predicate_pushdown_params.is_pushdown_enabled_for_time_range_constraint + and predicate_pushdown_params.time_range_constraint + ): candidate_nodes_for_left_side_of_join = list( node_processor.add_time_range_constraint( source_nodes=candidate_nodes_for_left_side_of_join, @@ -1298,15 +1317,21 @@ def _build_aggregated_measure_from_measure_source_node( + indent(f"\nmeasure_specs:\n{mf_pformat([measure_spec])}") + indent(f"\nevaluation:\n{mf_pformat(required_linkable_specs)}") ) - - # TODO - Pushdown: Update this to be more robust to additional pushdown parameters measure_time_constraint = ( (cumulative_metric_adjusted_time_constraint or predicate_pushdown_params.time_range_constraint) # If joining to time spine for time offset, constraints will be applied after that join. if not before_aggregation_time_spine_join_description else None ) - measure_pushdown_params = PredicatePushdownParameters(time_range_constraint=measure_time_constraint) + if measure_time_constraint is None: + measure_pushdown_params = PredicatePushdownParameters.without_time_range_constraint( + predicate_pushdown_params + ) + else: + measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( + predicate_pushdown_params, time_range_constraint=measure_time_constraint + ) + find_recipe_start_time = time.time() measure_recipe = self._find_dataflow_recipe( measure_spec_properties=measure_properties, diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index e21649e110..6463f7d3cf 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -1,9 +1,11 @@ from __future__ import annotations +import dataclasses import logging -from dataclasses import dataclass -from typing import List, Optional, Sequence, Set +from enum import Enum +from typing import FrozenSet, List, Optional, Sequence, Set +from dbt_semantic_interfaces.enum_extension import assert_values_exhausted from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference from metricflow_semantics.filters.time_constraint import TimeRangeConstraint from metricflow_semantics.mf_logging.pretty_print import mf_pformat @@ -28,7 +30,7 @@ logger = logging.getLogger(__name__) -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidateLineage: """Describes how the multi-hop join candidate was formed. @@ -48,7 +50,7 @@ class MultiHopJoinCandidateLineage: join_second_node_by_entity: LinklessEntitySpec -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidate: """A candidate node containing linkable specs that is join of other nodes. It's used to resolve multi-hop queries. @@ -59,14 +61,125 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -@dataclass(frozen=True) +class PredicateInputType(Enum): + """Enumeration of predicate input types we may encounter in where filters. + + This is primarily used for describing when predicate pushdown may operate in a dataflow plan, and is necessary + for holistic checks against the set of potentially enabled pushdown operations. For example, in the scenario + scenario where we only allow time range updates, we must do careful overriding of other pushdown properties. + + This also allows us to disable pushdown for things like time dimension filters in cases where we might + accidental censor input data. + """ + + CATEGORICAL_DIMENSION = "categorical_dimension" + ENTITY = "entity" + TIME_DIMENSION = "time_dimension" + TIME_RANGE_CONSTRAINT = "time_range_constraint" + + +@dataclasses.dataclass(frozen=True) class PredicatePushdownParameters: - """Container class for managing filter predicate pushdown. + """Container class for managing information about whether and how to do filter predicate pushdown. - Stores time constraint information for applying pre-join time filters. + The time_range_constraint property holds the time window for setting up a time range filter expression. """ time_range_constraint: Optional[TimeRangeConstraint] + pushdown_enabled_types: FrozenSet[PredicateInputType] = frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]) + + def __post_init__(self) -> None: + """Validation to ensure pushdown properties are configured correctly. + + In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via + outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed + on that particular code branch. It also asserts that unsupported pushdown scenarios are not configured. + """ + invalid_types: Set[PredicateInputType] = set() + + for input_type in self.pushdown_enabled_types: + if ( + input_type is PredicateInputType.CATEGORICAL_DIMENSION + or input_type is PredicateInputType.ENTITY + or input_type is PredicateInputType.TIME_DIMENSION + ): + invalid_types.add(input_type) + elif input_type is PredicateInputType.TIME_RANGE_CONSTRAINT: + continue + else: + assert_values_exhausted(input_type) + + assert len(invalid_types) == 0, ( + "Unsupported predicate input type found in pushdown state configuration! We currently only support " + "predicate pushdown for a subset of possible predicate input types (i.e., types of semantic manifest " + "elements, such as entities and time dimensions, referenced in filter predicates), but this was enabled " + f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}." + ) + + if self.is_pushdown_disabled: + # TODO: Include where filter specs when they are added to this class + assert self.time_range_constraint is None, ( + "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. The following properties should all have None values:\n" + f"time_range_constraint: {self.time_range_constraint}" + ) + + @property + def is_pushdown_disabled(self) -> bool: + """Convenience accessor for checking if pushdown should always be skipped.""" + return len(self.pushdown_enabled_types) == 0 + + @property + def is_pushdown_enabled_for_time_range_constraint(self) -> bool: + """Convenience accessor for checking if pushdown is enabled for time range constraints. + + Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while + we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, + but ideally we'd collapse this with the more general time dimension filter input scenarios. + """ + return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + + @staticmethod + def with_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, time_range_constraint: TimeRangeConstraint + ) -> PredicatePushdownParameters: + """Factory method for adding or updating a time range constraint input to a set of pushdown parameters. + + This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time + range constraint filter if one becomes available mid-stream during dataflow plan construction. + """ + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union( + {PredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters( + time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types + ) + + @staticmethod + def without_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, + ) -> PredicatePushdownParameters: + """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference( + {PredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types) + + @staticmethod + def with_pushdown_disabled() -> PredicatePushdownParameters: + """Factory method for disabling predicate pushdown for all parameter types. + + This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the + other may remain eligible. For example, a join linkage where one side of the join contains an unsupported + configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential + for using another path. + """ + return PredicatePushdownParameters( + time_range_constraint=None, + pushdown_enabled_types=frozenset(), + ) class PreJoinNodeProcessor: diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py new file mode 100644 index 0000000000..7493745f17 --- /dev/null +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import pytest +from metricflow_semantics.filters.time_constraint import TimeRangeConstraint + +from metricflow.plan_conversion.node_processor import PredicateInputType, PredicatePushdownParameters + + +@pytest.fixture +def all_pushdown_params() -> PredicatePushdownParameters: + """Tests a valid configuration with all predicate properties set and pushdown fully enabled.""" + params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), + ) + return params + + +def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdownParameters) -> None: + """Tests pushdown enabled check for time range pushdown operations.""" + time_range_only_params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), + ) + + enabled_states = { + "fully enabled": all_pushdown_params.is_pushdown_enabled_for_time_range_constraint, + "enabled for time range only": time_range_only_params.is_pushdown_enabled_for_time_range_constraint, + } + + assert all(list(enabled_states.values())), ( + "Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, " + f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n" + f"All params: {all_pushdown_params}\nTime range only params: {time_range_only_params}" + ) + + +def test_invalid_disabled_pushdown_params() -> None: + """Tests checks for invalid param configuration on disabled pushdown parameters.""" + with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"): + PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset() + )