Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: apply offset for nested derived metrics #884

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _build_derived_metric_output_node(
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
) -> ComputeMetricsNode:
) -> BaseOutput:
"""Builds a node to compute a metric defined from other metrics."""
metric = self._metric_lookup.get_metric(metric_spec.reference)
metric_input_specs = self._metric_lookup.metric_input_specs_for_metric(
Expand All @@ -225,7 +225,7 @@ def _build_derived_metric_output_node(
f"For {metric.type} metric: {metric_spec}, needed metrics are:\n"
f"{pformat_big_objects(metric_input_specs=metric_input_specs)}"
)
return ComputeMetricsNode(
compute_metrics_node = ComputeMetricsNode(
parent_node=self._build_metrics_output_node(
metric_specs=metric_input_specs,
queried_linkable_specs=queried_linkable_specs,
Expand All @@ -234,14 +234,32 @@ def _build_derived_metric_output_node(
),
metric_specs=[metric_spec],
)
# For nested ratio / derived metrics with time offset, apply offset after metric computation.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if metric_spec.offset_window or metric_spec.offset_to_grain:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there's a filter on metric_time?

Today we generally do:

  1. join
  2. filter
  3. aggregate

All of this is in ComputeMetricsNode, which comes post-aggregation, so I think we'll end up joining the time spine against the filtered input set here, effectively truncating the input data on the second offset.

If I'm right about this, we'll need to propagate the offset information so we can expand that window. For now it might be adequate to apply the filter expression after the final calculation if there's a nested derived offset, and then we can optimize the predicate handling later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this discussion over to a new PR, based off of one of Paul's branches

metric_time_dimension_specs = [
time_dimension_spec
for time_dimension_spec in queried_linkable_specs.time_dimension_specs
if time_dimension_spec.element_name == self._metric_time_dimension_reference.element_name
]
assert metric_time_dimension_specs, "Joining to time spine requires querying with metric time."
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
requested_metric_time_dimension_specs=metric_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
return join_to_time_spine_node or compute_metrics_node

def _build_any_metric_output_node(
self,
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
) -> ComputeMetricsNode:
) -> BaseOutput:
"""Builds a node to compute a metric of any type."""
metric = self._metric_lookup.get_metric(metric_spec.reference)

Expand Down
10 changes: 5 additions & 5 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,27 +675,27 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]: # noqa: D
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]: # noqa: D
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
"""Time range constraint to apply when querying time spine table."""
return self._time_range_constraint

@property
def offset_window(self) -> Optional[MetricTimeWindow]: # noqa: D
def offset_window(self) -> Optional[MetricTimeWindow]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_window

@property
def offset_to_grain(self) -> Optional[TimeGranularity]: # noqa: D
def offset_to_grain(self) -> Optional[TimeGranularity]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_to_grain

@property
def join_type(self) -> SqlJoinType: # noqa: D
def join_type(self) -> SqlJoinType:
"""Join type to use when joining to time spine."""
return self._join_type

Expand Down
26 changes: 26 additions & 0 deletions metricflow/test/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,29 @@ def test_dont_join_to_time_spine_if_no_time_dimension_requested( # noqa: D
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_nested_derived_metric_with_outer_offset( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(DataSet.metric_time_dimension_spec(TimeGranularity.DAY),),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,35 @@ metric:
- name: bookings
offset_window: 14 days
alias: bookings_2_weeks_ago
---
metric:
name: "bookings_offset_once"
description: bookings metric offset once.
type: derived
type_params:
expr: 2 * bookings
metrics:
- name: bookings
offset_window: 5 days
---
metric:
name: "bookings_offset_twice"
description: bookings metric offset twice.
type: derived
type_params:
expr: 2 * bookings_offset_once
metrics:
- name: bookings_offset_once
offset_window: 5 days
---
metric:
name: booking_fees_since_start_of_month
description: nested derived metric with offset and multiple input metrics
type: derived
type_params:
expr: booking_fees - booking_fees_start_of_month
metrics:
- name: booking_fees
offset_to_grain: month
alias: booking_fees_start_of_month
- name: booking_fees
90 changes: 90 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1468,3 +1468,93 @@ integration_test:
GROUP BY
COALESCE(subq_9.metric_time__day, subq_19.metric_time__day)
, COALESCE(subq_9.listing__is_lux_latest, subq_19.listing__is_lux_latest)
---
integration_test:
name: nested_derived_metric_outer_offset
description: Tests a nested derived metric where the outer metric has an input metric with offset_window.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
subq_9.ds AS metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM {{ source_schema }}.mf_time_spine subq_9
INNER JOIN (
SELECT
metric_time__day
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_1
ON
{{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
) subq_7
) subq_8
ON
{{ render_date_sub("subq_9", "ds", 5, TimeGranularity.DAY) }} = subq_8.metric_time__day
---
integration_test:
name: nested_derived_metric_outer_offset_multiple_input_metrics
description: Tests a nested derived metric where the outer metric has one input metric with offset_to_grain and another with no offset.
model: SIMPLE_MODEL
metrics: ["booking_fees_since_start_of_month"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
metric_time__day
, booking_fees - booking_fees_start_of_month AS booking_fees_since_start_of_month
FROM (
SELECT
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day) AS metric_time__day
, MAX(subq_8.booking_fees_start_of_month) AS booking_fees_start_of_month
, MAX(subq_14.booking_fees) AS booking_fees
FROM (
SELECT
subq_7.ds AS metric_time__day
, subq_5.booking_fees_start_of_month AS booking_fees_start_of_month
FROM {{ source_schema }}.mf_time_spine subq_7
INNER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees_start_of_month
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_4
) subq_5
ON
{{ render_date_trunc("subq_7.ds", TimeGranularity.MONTH) }} = subq_5.metric_time__day
) subq_8
FULL OUTER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_13
) subq_14
ON
subq_8.metric_time__day = subq_14.metric_time__day
GROUP BY
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day)
) subq_15
54 changes: 54 additions & 0 deletions metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
from typing import List

import pytest
Expand Down Expand Up @@ -1035,3 +1036,56 @@ def test_dimensions_requiring_join(
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


logger = logging.getLogger(__name__)


@pytest.mark.sql_engine_snapshot
def test_nested_offsets( # noqa: D
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(MTD_SPEC_DAY,),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_derived_metric_with_offset_multiple_input_metrics( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="booking_fees_since_start_of_month"),),
time_dimension_specs=(MTD_SPEC_DAY,),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)
Loading
Loading