Skip to content

Commit

Permalink
Refine time constraint management for predicate pushdown tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
tlento committed May 17, 2024
1 parent 0fb06dc commit 680270f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
23 changes: 14 additions & 9 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,9 +1323,14 @@ def _build_aggregated_measure_from_measure_source_node(
if not before_aggregation_time_spine_join_description
else None
)
measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint(
predicate_pushdown_params, time_range_constraint=measure_time_constraint
)
if measure_time_constraint is None:
measure_pushdown_params = PredicatePushdownParameters.without_time_range_constraint(
predicate_pushdown_params
)
else:
measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint(
predicate_pushdown_params, time_range_constraint=measure_time_constraint
)

find_recipe_start_time = time.time()
measure_recipe = self._find_dataflow_recipe(
Expand Down Expand Up @@ -1448,12 +1453,12 @@ def _build_aggregated_measure_from_measure_source_node(
if non_additive_dimension_spec is not None:
# Apply semi additive join on the node
agg_time_dimension = measure_properties.agg_time_dimension
queried_time_dimension_spec: Optional[TimeDimensionSpec] = (
self._find_non_additive_dimension_in_linkable_specs(
agg_time_dimension=agg_time_dimension,
linkable_specs=queried_linkable_specs.as_tuple,
non_additive_dimension_spec=non_additive_dimension_spec,
)
queried_time_dimension_spec: Optional[
TimeDimensionSpec
] = self._find_non_additive_dimension_in_linkable_specs(
agg_time_dimension=agg_time_dimension,
linkable_specs=queried_linkable_specs.as_tuple,
non_additive_dimension_spec=non_additive_dimension_spec,
)
time_dimension_spec = TimeDimensionSpec.from_name(non_additive_dimension_spec.name)
window_groupings = tuple(
Expand Down
31 changes: 20 additions & 11 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,29 @@ def is_pushdown_enabled_for_time_range_constraint(self) -> bool:

@staticmethod
def with_time_range_constraint(
original_pushdown_params: PredicatePushdownParameters, time_range_constraint: Optional[TimeRangeConstraint]
original_pushdown_params: PredicatePushdownParameters, time_range_constraint: TimeRangeConstraint
) -> PredicatePushdownParameters:
"""Factory method for overriding the time range constraint value in a given set of pushdown parameters.
"""Factory method for adding or updating a time range constraint input to a set of pushdown parameters.
This allows for crude updates to the core time range constraint, including selectively disabling time range
predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine
joins and cumulative metrics.
This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time
range constraint filter if one becomes available mid-stream during dataflow plan construction.
"""
if original_pushdown_params.is_pushdown_enabled_for_time_range_constraint:
return PredicatePushdownParameters(
time_range_constraint=time_range_constraint,
)
else:
return original_pushdown_params
pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union(
{PushdownPredicateInputType.TIME_RANGE_CONSTRAINT}
)
return PredicatePushdownParameters(
time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types
)

@staticmethod
def without_time_range_constraint(
original_pushdown_params: PredicatePushdownParameters,
) -> PredicatePushdownParameters:
"""Factory method for removing the time range constraint, if any, from the given set of pushdown parameters."""
pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference(
{PushdownPredicateInputType.TIME_RANGE_CONSTRAINT}
)
return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types)

@staticmethod
def with_pushdown_disabled() -> PredicatePushdownParameters:
Expand Down

0 comments on commit 680270f

Please sign in to comment.