Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: incorrect time constraint for time offset metrics #932

Merged
merged 6 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20231207-145013.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fixes incorrect time constraint applied to derived offset metrics.
time: 2023-12-07T14:50:13.630957-08:00
custom:
Author: courtneyholcomb
Issue: "925"
35 changes: 18 additions & 17 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ def _build_derived_metric_output_node(
)

parent_nodes: List[BaseOutput] = []
metric_has_time_offset = bool(metric_spec.offset_window or metric_spec.offset_to_grain)
for metric_input_spec in metric_input_specs:
parent_nodes.append(
self._build_any_metric_output_node(
Expand All @@ -246,11 +245,12 @@ def _build_derived_metric_output_node(
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=queried_linkable_specs
if not metric_has_time_offset
if not metric_spec.has_time_offset
else required_linkable_specs,
# If metric is offset, we'll apply constraint after offset to avoid removing values unexpectedly.
where_constraint=where_constraint if not metric_has_time_offset else None,
time_range_constraint=time_range_constraint if not metric_has_time_offset else None,
# If metric is offset, we'll apply where constraint after offset to avoid removing values unexpectedly.
# Time constraint will be applied by INNER JOINing to time spine.
where_constraint=where_constraint if not metric_spec.has_time_offset else None,
time_range_constraint=time_range_constraint if not metric_spec.has_time_offset else None,
)
)

Expand All @@ -260,7 +260,7 @@ def _build_derived_metric_output_node(
output_node: BaseOutput = ComputeMetricsNode(parent_node=parent_node, metric_specs=[metric_spec])

# For nested ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
if metric_has_time_offset:
if metric_spec.has_time_offset:
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric_time."
Expand All @@ -272,10 +272,6 @@ def _build_derived_metric_output_node(
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
if time_range_constraint:
output_node = ConstrainTimeRangeNode(
parent_node=output_node, time_range_constraint=time_range_constraint
)
if where_constraint:
output_node = WhereConstraintNode(parent_node=output_node, where_constraint=where_constraint)
if not extraneous_linkable_specs.is_subset_of(queried_linkable_specs):
Expand Down Expand Up @@ -868,9 +864,14 @@ def _build_aggregated_measure_from_measure_source_node(
)

find_recipe_start_time = time.time()
before_aggregation_time_spine_join_description = (
metric_input_measure_spec.before_aggregation_time_spine_join_description
)
measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=measure_properties,
time_range_constraint=cumulative_metric_adjusted_time_constraint or time_range_constraint,
time_range_constraint=(cumulative_metric_adjusted_time_constraint or time_range_constraint)
if not before_aggregation_time_spine_join_description
else None,
linkable_spec_set=required_linkable_specs,
)
logger.info(
Expand All @@ -894,15 +895,13 @@ def _build_aggregated_measure_from_measure_source_node(
parent_node=measure_recipe.source_node,
window=cumulative_window,
grain_to_date=cumulative_grain_to_date,
time_range_constraint=time_range_constraint,
time_range_constraint=time_range_constraint
if not before_aggregation_time_spine_join_description
else None,
)

# If querying an offset metric, join to time spine.
# If querying an offset metric, join to time spine before aggregation.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None

before_aggregation_time_spine_join_description = (
metric_input_measure_spec.before_aggregation_time_spine_join_description
)
if before_aggregation_time_spine_join_description is not None:
assert (
queried_linkable_specs.contains_metric_time
Expand Down Expand Up @@ -1002,6 +1001,8 @@ def _build_aggregated_measure_from_measure_source_node(
parent_node=pre_aggregate_node,
metric_input_measure_specs=(metric_input_measure_spec,),
)

# Joining to time spine after aggregation is for measures that specify `join_to_timespine`` in the YAML spec.
after_aggregation_time_spine_join_description = (
metric_input_measure_spec.after_aggregation_time_spine_join_description
)
Expand Down
4 changes: 4 additions & 0 deletions metricflow/specs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ def reference(self) -> MetricReference:
"""Return the reference object that is used for referencing the associated metric in the manifest."""
return MetricReference(element_name=self.element_name)

@property
def has_time_offset(self) -> bool: # noqa: D
return bool(self.offset_window or self.offset_to_grain)

def without_offset(self) -> MetricSpec:
"""Represents the metric spec with any time offsets removed."""
return MetricSpec(element_name=self.element_name, constraint=self.constraint, alias=self.alias)
Expand Down
61 changes: 61 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,67 @@ integration_test:
{{ render_date_sub("subq_9", "ds", 2, TimeGranularity.DAY) }} = subq_8.metric_time__day
WHERE {{ render_time_constraint('subq_9.ds', '2020-01-08', '2020-01-09') }}
---
integration_test:
name: time_offset_metric_with_time_constraint
description: Tests a derived offset metric with a time constraint
model: SIMPLE_MODEL
metrics: ["bookings_5_day_lag"]
group_by_objs: [{"name": "metric_time", "grain": "day"}]
time_constraint: ["2019-12-19", "2020-01-02"]
check_query: |
SELECT
metric_time__day
, bookings_5_days_ago AS bookings_5_day_lag
FROM (
SELECT
subq_3.metric_time__day AS metric_time__day
, SUM(subq_2.bookings) AS bookings_5_days_ago
FROM (
SELECT
ds AS metric_time__day
FROM {{ source_schema }}.mf_time_spine
WHERE {{ render_time_constraint('ds', '2019-12-19', '2020-01-02') }}
) subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings
) subq_2
ON {{ render_date_sub("subq_3", "metric_time__day", 5, TimeGranularity.DAY) }} = subq_2.metric_time__day
GROUP BY subq_3.metric_time__day
)
---
integration_test:
name: cumulative_time_offset_metric_with_time_constraint
description: Tests a cumulative derived offset metric with a time constraint
model: SIMPLE_MODEL
metrics: ["every_2_days_bookers_2_days_ago"]
group_by_objs: [{"name": "metric_time", "grain": "day"}]
time_constraint: ["2019-12-19", "2020-01-02"]
check_query: |
SELECT
subq_5.metric_time__day AS metric_time__day
, COUNT(DISTINCT subq_4.bookers) AS every_2_days_bookers_2_days_ago
FROM (
SELECT
ds AS metric_time__day
FROM {{ source_schema }}.mf_time_spine subq_6
WHERE ds BETWEEN '2019-12-19' AND '2020-01-02'
) subq_5
INNER JOIN (
SELECT
subq_3.ds AS metric_time__day
, b.guest_id AS bookers
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN
{{ source_schema }}.fct_bookings b
ON ({{ render_date_trunc("b.ds", TimeGranularity.DAY) }} <= subq_3.ds)
AND ({{ render_date_trunc("b.ds", TimeGranularity.DAY) }} > {{ render_date_sub("subq_3", "ds", 2, TimeGranularity.DAY) }})
) subq_4
ON {{ render_date_sub("subq_5", "metric_time__day", 2, TimeGranularity.DAY) }} = subq_4.metric_time__day
GROUP BY subq_5.metric_time__day
---
integration_test:
name: nested_derived_metric_offset_with_joined_where_constraint_not_selected
description: Tests a nested derived metric where the outer metric has an offset and where constraint that requires an additional join, and is not used in the select statement.
Expand Down
56 changes: 56 additions & 0 deletions metricflow/test/query_rendering/test_derived_metric_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,62 @@ def test_nested_offsets_with_time_constraint( # noqa: D
)


@pytest.mark.sql_engine_snapshot
def test_time_offset_metric_with_time_constraint( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_5_day_lag"),),
time_dimension_specs=(MTD_SPEC_DAY,),
time_range_constraint=TimeRangeConstraint(
start_time=datetime.datetime(2019, 12, 19), end_time=datetime.datetime(2020, 1, 2)
),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_cumulative_time_offset_metric_with_time_constraint( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="every_2_days_bookers_2_days_ago"),),
time_dimension_specs=(MTD_SPEC_DAY,),
time_range_constraint=TimeRangeConstraint(
start_time=datetime.datetime(2019, 12, 19), end_time=datetime.datetime(2020, 1, 2)
),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_derived_metric_offset_with_joined_where_constraint_not_selected( # noqa: D
request: FixtureRequest,
Expand Down
Loading