diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index bb25b1bea3..7e6b7a3bfe 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -1323,9 +1323,14 @@ def _build_aggregated_measure_from_measure_source_node( if not before_aggregation_time_spine_join_description else None ) - measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( - predicate_pushdown_params, time_range_constraint=measure_time_constraint - ) + if measure_time_constraint is None: + measure_pushdown_params = PredicatePushdownParameters.without_time_range_constraint( + predicate_pushdown_params + ) + else: + measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( + predicate_pushdown_params, time_range_constraint=measure_time_constraint + ) find_recipe_start_time = time.time() measure_recipe = self._find_dataflow_recipe( @@ -1448,12 +1453,12 @@ def _build_aggregated_measure_from_measure_source_node( if non_additive_dimension_spec is not None: # Apply semi additive join on the node agg_time_dimension = measure_properties.agg_time_dimension - queried_time_dimension_spec: Optional[TimeDimensionSpec] = ( - self._find_non_additive_dimension_in_linkable_specs( - agg_time_dimension=agg_time_dimension, - linkable_specs=queried_linkable_specs.as_tuple, - non_additive_dimension_spec=non_additive_dimension_spec, - ) + queried_time_dimension_spec: Optional[ + TimeDimensionSpec + ] = self._find_non_additive_dimension_in_linkable_specs( + agg_time_dimension=agg_time_dimension, + linkable_specs=queried_linkable_specs.as_tuple, + non_additive_dimension_spec=non_additive_dimension_spec, ) time_dimension_spec = TimeDimensionSpec.from_name(non_additive_dimension_spec.name) window_groupings = tuple( diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 1c5ad09fa1..b664e1ad2d 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -140,20 +140,29 @@ def is_pushdown_enabled_for_time_range_constraint(self) -> bool: @staticmethod def with_time_range_constraint( - original_pushdown_params: PredicatePushdownParameters, time_range_constraint: Optional[TimeRangeConstraint] + original_pushdown_params: PredicatePushdownParameters, time_range_constraint: TimeRangeConstraint ) -> PredicatePushdownParameters: - """Factory method for overriding the time range constraint value in a given set of pushdown parameters. + """Factory method for adding or updating a time range constraint input to a set of pushdown parameters. - This allows for crude updates to the core time range constraint, including selectively disabling time range - predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine - joins and cumulative metrics. + This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time + range constraint filter if one becomes available mid-stream during dataflow plan construction. """ - if original_pushdown_params.is_pushdown_enabled_for_time_range_constraint: - return PredicatePushdownParameters( - time_range_constraint=time_range_constraint, - ) - else: - return original_pushdown_params + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union( + {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters( + time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types + ) + + @staticmethod + def without_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, + ) -> PredicatePushdownParameters: + """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference( + {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types) @staticmethod def with_pushdown_disabled() -> PredicatePushdownParameters: