Skip to content

Commit

Permalink
Bug fix: apply time constraints after time offsets (#1544)
Browse files Browse the repository at this point in the history
Time constraints were being applied before time offsets in some
scenarios. This is incorrect because the values will change during the
offset. This fixes that. Customers likely never saw this bug because
time constraints are only available to open source users, and this may
never have been released to them.
  • Loading branch information
courtneyholcomb authored Dec 9, 2024
1 parent 7a00c2f commit c262086
Show file tree
Hide file tree
Showing 45 changed files with 5,478 additions and 4,822 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241121-140607.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Apply time constraints after time offsets to avoid filtering out values that
will change later in the query.
time: 2024-11-21T14:06:07.618628-08:00
custom:
Author: courtneyholcomb
Issue: "1544"
58 changes: 28 additions & 30 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,36 +648,39 @@ def _build_derived_metric_output_node(
aggregated_to_elements=set(queried_linkable_specs.as_tuple),
)

# For ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
# For ratio / derived metrics with time offset, apply offset join here. Constraints will be applied after the offset
# to avoid filtering out values that will be changed.
if metric_spec.has_time_offset:
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)

if len(metric_spec.filter_spec_set.all_filter_specs) > 0:
output_node = WhereConstraintNode.create(
parent_node=output_node, where_specs=metric_spec.filter_spec_set.all_filter_specs
)
if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or predicate_pushdown_state.time_range_constraint:
# FilterElementsNode will only be needed if there are where filter specs that were selected in the group by.
specs_in_filters = set(
linkable_spec
for filter_spec in metric_spec.filter_spec_set.all_filter_specs
for linkable_spec in filter_spec.linkable_specs
)
filter_to_specs = None
if not specs_in_filters.issubset(queried_linkable_specs.as_tuple):
output_node = FilterElementsNode.create(
parent_node=output_node,
include_specs=InstanceSpecSet(metric_specs=(metric_spec,)).merge(
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
),
filter_to_specs = InstanceSpecSet(metric_specs=(metric_spec,)).merge(
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
)
output_node = self._build_pre_aggregation_plan(
source_node=output_node,
where_filter_specs=metric_spec.filter_spec_set.all_filter_specs,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
filter_to_specs=filter_to_specs,
)

return output_node

def _get_base_agg_time_dimensions(
Expand Down Expand Up @@ -1645,7 +1648,6 @@ def _build_aggregated_measure_from_measure_source_node(
unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
requested_agg_time_dimension_specs=base_agg_time_dimension_specs,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
join_type=before_aggregation_time_spine_join_description.join_type,
Expand All @@ -1657,17 +1659,12 @@ def _build_aggregated_measure_from_measure_source_node(
# If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity.
if spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple
]
# If time constraint was previously adjusted for cumulative window or grain, apply original time constraint
# here. Can skip if metric is being aggregated over all time.
# Apply original time constraint if it wasn't applied to the source node recipe. For cumulative metrics, the constraint
# may have been expanded and needs to be narrowed here. For offsets, the constraint was deferred to after the offset.
# TODO - Pushdown: Encapsulate all of this window sliding bookkeeping in the pushdown params object
time_range_constraint_to_apply = (
predicate_pushdown_state.time_range_constraint
if (
cumulative_metric_adjusted_time_constraint is not None
and predicate_pushdown_state.time_range_constraint is not None
)
else None
)
time_range_constraint_to_apply = None
if cumulative_metric_adjusted_time_constraint or before_aggregation_time_spine_join_description:
time_range_constraint_to_apply = predicate_pushdown_state.time_range_constraint
unaggregated_measure_node = self._build_pre_aggregation_plan(
source_node=unaggregated_measure_node,
join_targets=measure_recipe.join_targets,
Expand Down Expand Up @@ -1753,11 +1750,11 @@ def _build_aggregated_measure_from_measure_source_node(
def _build_pre_aggregation_plan(
self,
source_node: DataflowPlanNode,
join_targets: List[JoinDescription],
custom_granularity_specs: Sequence[TimeDimensionSpec],
where_filter_specs: Sequence[WhereFilterSpec],
time_range_constraint: Optional[TimeRangeConstraint],
filter_to_specs: InstanceSpecSet,
filter_to_specs: Optional[InstanceSpecSet] = None,
join_targets: List[JoinDescription] = [],
custom_granularity_specs: Sequence[TimeDimensionSpec] = (),
where_filter_specs: Sequence[WhereFilterSpec] = (),
time_range_constraint: Optional[TimeRangeConstraint] = None,
measure_properties: Optional[MeasureSpecProperties] = None,
queried_linkable_specs: Optional[LinkableSpecSet] = None,
distinct: bool = False,
Expand Down Expand Up @@ -1791,9 +1788,10 @@ def _build_pre_aggregation_plan(
queried_linkable_specs=queried_linkable_specs,
parent_node=output_node,
)
output_node = FilterElementsNode.create(
parent_node=output_node, include_specs=filter_to_specs, distinct=distinct
)
if filter_to_specs:
output_node = FilterElementsNode.create(
parent_node=output_node, include_specs=filter_to_specs, distinct=distinct
)

return output_node

Expand Down
2 changes: 1 addition & 1 deletion tests_metricflow/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,6 @@ integration_test:
SELECT
ds AS metric_time__day
FROM {{ source_schema }}.mf_time_spine
WHERE {{ render_time_constraint('ds', '2019-12-19', '2020-01-02') }}
) subq_3
INNER JOIN (
SELECT
Expand All @@ -1716,6 +1715,7 @@ integration_test:
ON {{ render_date_sub("subq_3", "metric_time__day", 5, TimeGranularity.DAY) }} = subq_2.metric_time__day
GROUP BY subq_3.metric_time__day
) outer_subq
WHERE {{ render_time_constraint('metric_time__day', '2019-12-19', '2020-01-02') }}
---
integration_test:
name: cumulative_time_offset_metric_with_time_constraint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ FROM (
SELECT
subq_6.ds AS metric_time__day
FROM ***************************.mf_time_spine subq_6
WHERE subq_6.ds BETWEEN '2019-12-19' AND '2020-01-02'
) subq_5
INNER JOIN (
-- Join Self Over Time Range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,9 @@ FROM (
-- Aggregate Measures
-- Compute Metrics via Expressions
SELECT
subq_17.metric_time__day AS metric_time__day
subq_18.ds AS metric_time__day
, COUNT(DISTINCT subq_16.bookers) AS every_2_days_bookers_2_days_ago
FROM (
-- Time Spine
SELECT
ds AS metric_time__day
FROM ***************************.mf_time_spine subq_18
WHERE ds BETWEEN '2019-12-19' AND '2020-01-02'
) subq_17
FROM ***************************.mf_time_spine subq_18
INNER JOIN (
-- Join Self Over Time Range
SELECT
Expand All @@ -38,8 +32,8 @@ FROM (
)
) subq_16
ON
DATE_SUB(CAST(subq_17.metric_time__day AS DATETIME), INTERVAL 2 day) = subq_16.metric_time__day
WHERE subq_17.metric_time__day BETWEEN '2019-12-19' AND '2020-01-02'
DATE_SUB(CAST(subq_18.ds AS DATETIME), INTERVAL 2 day) = subq_16.metric_time__day
WHERE subq_18.ds BETWEEN '2019-12-19' AND '2020-01-02'
GROUP BY
metric_time__day
) subq_23
Loading

0 comments on commit c262086

Please sign in to comment.