Skip to content

Commit

Permalink
Encapsulate time range constraint application in more general method (#…
Browse files Browse the repository at this point in the history
…1225)

The time range constraint is our currently supported predicate pushdown
operation. The specific application of a time range constraint is
inlined in the DataflowPlanBuilder. This moves it to a more general
function, which will make it easier for us to handle other predicate
pushdown states while keeping the conditional logic contained.

Note this is effectively a no-op as the new function is currently a
pass-through. This is borne out by the lack of changes in our snapshots,
several of which cover the time range node.
  • Loading branch information
tlento authored May 24, 2024
1 parent e338402 commit d995502
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 26 deletions.
11 changes: 4 additions & 7 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,16 +882,13 @@ def _find_dataflow_recipe(
semantic_model_lookup=self._semantic_model_lookup,
node_data_set_resolver=self._node_data_set_resolver,
)
# TODO - Pushdown: Encapsulate this in the node processor
if (
predicate_pushdown_state.is_pushdown_enabled_for_time_range_constraint
and predicate_pushdown_state.time_range_constraint
):

if predicate_pushdown_state.has_pushdown_potential:
candidate_nodes_for_left_side_of_join = list(
node_processor.add_time_range_constraint(
node_processor.apply_matching_filter_predicates(
source_nodes=candidate_nodes_for_left_side_of_join,
predicate_pushdown_state=predicate_pushdown_state,
metric_time_dimension_reference=self._metric_time_dimension_reference,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
)
)

Expand Down
54 changes: 38 additions & 16 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,30 +125,36 @@ def __post_init__(self) -> None:
f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}."
)

if self.is_pushdown_disabled:
# TODO: Include where filter specs when they are added to this class and check state -> predicate pairs
assert self.time_range_constraint is None, (
"Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties "
"set that may lead to improper access and use in other contexts, as that can lead to unintended "
"filtering operations in cases where these properties are accessed without appropriate checks against "
"pushdown configuration. The following properties should all have None values:\n"
f"time_range_constraint: {self.time_range_constraint}"
)
# TODO: Include where filter specs when they are added to this class
time_range_constraint_is_valid = (
self.time_range_constraint is None
or PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types
)
assert time_range_constraint_is_valid, (
"Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties "
"set that may lead to improper access and use in other contexts, as that can lead to unintended "
"filtering operations in cases where these properties are accessed without appropriate checks against "
"pushdown configuration. The following properties should all have None values:\n"
f"time_range_constraint: {self.time_range_constraint}"
)

@property
def is_pushdown_disabled(self) -> bool:
"""Convenience accessor for checking if pushdown should always be skipped."""
return len(self.pushdown_enabled_types) == 0
def has_pushdown_potential(self) -> bool:
"""Returns whether or not pushdown is enabled for a type with predicate candidates in place."""
return self.has_time_range_constraint_to_push_down

@property
def is_pushdown_enabled_for_time_range_constraint(self) -> bool:
"""Convenience accessor for checking if pushdown is enabled for time range constraints.
def has_time_range_constraint_to_push_down(self) -> bool:
"""Convenience accessor for checking if there is a time range constraint that can be pushed down.
Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while
we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility,
but ideally we'd collapse this with the more general time dimension filter input scenarios.
"""
return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types
return (
PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types
and self.time_range_constraint is not None
)

@staticmethod
def with_time_range_constraint(
Expand Down Expand Up @@ -220,7 +226,23 @@ def __init__( # noqa: D107
self._semantic_model_lookup = semantic_model_lookup
self._join_evaluator = JoinDataflowOutputValidator(semantic_model_lookup)

def add_time_range_constraint(
def apply_matching_filter_predicates(
self,
source_nodes: Sequence[DataflowPlanNode],
predicate_pushdown_state: PredicatePushdownState,
metric_time_dimension_reference: TimeDimensionReference,
) -> Sequence[DataflowPlanNode]:
"""Adds filter predicate nodes to the input nodes as appropriate."""
if predicate_pushdown_state.has_time_range_constraint_to_push_down:
source_nodes = self._add_time_range_constraint(
source_nodes=source_nodes,
metric_time_dimension_reference=metric_time_dimension_reference,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
)

return source_nodes

def _add_time_range_constraint(
self,
source_nodes: Sequence[DataflowPlanNode],
metric_time_dimension_reference: TimeDimensionReference,
Expand Down
6 changes: 3 additions & 3 deletions tests_metricflow/dataflow/builder/test_predicate_pushdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ def test_time_range_pushdown_enabled_states(fully_enabled_pushdown_state: Predic
)

enabled_states = {
"fully enabled": fully_enabled_pushdown_state.is_pushdown_enabled_for_time_range_constraint,
"enabled for time range only": time_range_only_state.is_pushdown_enabled_for_time_range_constraint,
"fully enabled": fully_enabled_pushdown_state.has_time_range_constraint_to_push_down,
"enabled for time range only": time_range_only_state.has_time_range_constraint_to_push_down,
}

assert all(list(enabled_states.values())), (
"Expected pushdown to be enabled for pushdown state with time range constraint and global pushdown enabled, "
"but some states returned False for is_pushdown_enabled_for_time_range_constraints.\n"
"but some states returned False for has_time_range_constraint_to_push_down.\n"
f"Pushdown enabled states: {enabled_states}\n"
f"Fully enabled state: {fully_enabled_pushdown_state}\n"
f"Time range only state: {time_range_only_state}"
Expand Down

0 comments on commit d995502

Please sign in to comment.