Skip to content

Commit

Permalink
Straggling renames plus documentation updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tlento committed May 24, 2024
1 parent ec3caf5 commit 508cfb9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
12 changes: 6 additions & 6 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ def _build_aggregated_conversion_node(
# Due to other outstanding issues with conversion metric filters, we disable predicate
# pushdown for any filter parameter set that is not part of the original time range constraint
# implementation.
disabled_pushdown_parameters = PredicatePushdownState.with_pushdown_disabled()
time_range_only_pushdown_parameters = PredicatePushdownState(
disabled_pushdown_state = PredicatePushdownState.with_pushdown_disabled()
time_range_only_pushdown_state = PredicatePushdownState(
time_range_constraint=predicate_pushdown_state.time_range_constraint,
pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]),
)
Expand All @@ -258,13 +258,13 @@ def _build_aggregated_conversion_node(
)
base_measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]),
predicate_pushdown_state=time_range_only_pushdown_parameters,
predicate_pushdown_state=time_range_only_pushdown_state,
linkable_spec_set=base_required_linkable_specs,
)
logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}")
conversion_measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]),
predicate_pushdown_state=disabled_pushdown_parameters,
predicate_pushdown_state=disabled_pushdown_state,
linkable_spec_set=LinkableSpecSet(),
)
logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}")
Expand All @@ -281,7 +281,7 @@ def _build_aggregated_conversion_node(
aggregated_base_measure_node = self.build_aggregated_measure(
metric_input_measure_spec=base_measure_spec,
queried_linkable_specs=queried_linkable_specs,
predicate_pushdown_state=time_range_only_pushdown_parameters,
predicate_pushdown_state=time_range_only_pushdown_state,
)

# Build unaggregated conversions source node
Expand Down Expand Up @@ -357,7 +357,7 @@ def _build_aggregated_conversion_node(
aggregated_conversions_node = self.build_aggregated_measure(
metric_input_measure_spec=conversion_measure_spec,
queried_linkable_specs=queried_linkable_specs,
predicate_pushdown_state=disabled_pushdown_parameters,
predicate_pushdown_state=disabled_pushdown_state,
measure_recipe=recipe_with_join_conversion_source_node,
)

Expand Down
23 changes: 16 additions & 7 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class PredicateInputType(Enum):
scenario where we only allow time range updates, we must do careful overriding of other pushdown properties.
This also allows us to disable pushdown for things like time dimension filters in cases where we might
accidental censor input data.
accidentally censor input data.
"""

CATEGORICAL_DIMENSION = "categorical_dimension"
Expand All @@ -80,7 +80,16 @@ class PredicateInputType(Enum):

@dataclasses.dataclass(frozen=True)
class PredicatePushdownState:
"""Container class for managing information about whether and how to do filter predicate pushdown.
"""Container class for maintaining state information relevant for predicate pushdown.
This broadly tracks two related items:
1. Filter predicates collected during the process of constructing a dataflow plan
2. Predicate types eligible for pushdown
The former may be updated as things like time constraints get altered or metric and measure filters are
added to the query filters.
The latter may be updated based on query configuration, like if a cumulative metric is added to the plan
there may be changes to what sort of predicate pushdown operations are supported.
The time_range_constraint property holds the time window for setting up a time range filter expression.
"""
Expand Down Expand Up @@ -117,9 +126,9 @@ def __post_init__(self) -> None:
)

if self.is_pushdown_disabled:
# TODO: Include where filter specs when they are added to this class
# TODO: Include where filter specs when they are added to this class and check state -> predicate pairs
assert self.time_range_constraint is None, (
"Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties "
"Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties "
"set that may lead to improper access and use in other contexts, as that can lead to unintended "
"filtering operations in cases where these properties are accessed without appropriate checks against "
"pushdown configuration. The following properties should all have None values:\n"
Expand All @@ -145,7 +154,7 @@ def is_pushdown_enabled_for_time_range_constraint(self) -> bool:
def with_time_range_constraint(
original_pushdown_state: PredicatePushdownState, time_range_constraint: TimeRangeConstraint
) -> PredicatePushdownState:
"""Factory method for adding or updating a time range constraint input to a set of pushdown parameters.
"""Factory method for updating a pushdown state with a time range constraint.
This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time
range constraint filter if one becomes available mid-stream during dataflow plan construction.
Expand All @@ -161,15 +170,15 @@ def with_time_range_constraint(
def without_time_range_constraint(
original_pushdown_state: PredicatePushdownState,
) -> PredicatePushdownState:
"""Factory method for removing the time range constraint, if any, from the given set of pushdown parameters."""
"""Factory method for updating pushdown state to bypass time range constraints."""
pushdown_enabled_types = original_pushdown_state.pushdown_enabled_types.difference(
{PredicateInputType.TIME_RANGE_CONSTRAINT}
)
return PredicatePushdownState(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types)

@staticmethod
def with_pushdown_disabled() -> PredicatePushdownState:
"""Factory method for disabling predicate pushdown for all parameter types.
"""Factory method for configuring a disabled predicate pushdown state.
This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the
other may remain eligible. For example, a join linkage where one side of the join contains an unsupported
Expand Down
20 changes: 11 additions & 9 deletions tests_metricflow/dataflow/builder/test_predicate_pushdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,36 @@


@pytest.fixture
def all_pushdown_state() -> PredicatePushdownState:
def fully_enabled_pushdown_state() -> PredicatePushdownState:
"""Tests a valid configuration with all predicate properties set and pushdown fully enabled."""
params = PredicatePushdownState(
time_range_constraint=TimeRangeConstraint.all_time(),
)
return params


def test_time_range_pushdown_enabled_states(all_pushdown_state: PredicatePushdownState) -> None:
def test_time_range_pushdown_enabled_states(fully_enabled_pushdown_state: PredicatePushdownState) -> None:
"""Tests pushdown enabled check for time range pushdown operations."""
time_range_only_params = PredicatePushdownState(
time_range_only_state = PredicatePushdownState(
time_range_constraint=TimeRangeConstraint.all_time(),
pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]),
)

enabled_states = {
"fully enabled": all_pushdown_state.is_pushdown_enabled_for_time_range_constraint,
"enabled for time range only": time_range_only_params.is_pushdown_enabled_for_time_range_constraint,
"fully enabled": fully_enabled_pushdown_state.is_pushdown_enabled_for_time_range_constraint,
"enabled for time range only": time_range_only_state.is_pushdown_enabled_for_time_range_constraint,
}

assert all(list(enabled_states.values())), (
"Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, "
f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n"
f"All params: {all_pushdown_state}\nTime range only params: {time_range_only_params}"
"Expected pushdown to be enabled for pushdown state with time range constraint and global pushdown enabled, "
"but some states returned False for is_pushdown_enabled_for_time_range_constraints.\n"
f"Pushdown enabled states: {enabled_states}\n"
f"Fully enabled state: {fully_enabled_pushdown_state}\n"
f"Time range only state: {time_range_only_state}"
)


def test_invalid_disabled_pushdown_state() -> None:
"""Tests checks for invalid param configuration on disabled pushdown parameters."""
with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"):
with pytest.raises(AssertionError, match="Disabled pushdown state objects cannot have properties set"):
PredicatePushdownState(time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset())

0 comments on commit 508cfb9

Please sign in to comment.