Skip to content

Commit

Permalink
Bug fix: apply offset for nested derived metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 16, 2023
1 parent 863352b commit a82ddea
Show file tree
Hide file tree
Showing 13 changed files with 4,257 additions and 6 deletions.
20 changes: 19 additions & 1 deletion metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def _build_metrics_output_node(

for metric_spec in metric_specs:
logger.info(f"Generating compute metrics node for {metric_spec}")
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
metric_reference = metric_spec.as_reference
metric = self._metric_lookup.get_metric(metric_reference)

Expand All @@ -203,6 +204,23 @@ def _build_metrics_output_node(
),
metric_specs=[metric_spec],
)

# For nested ratio / derived metrics with time offset, apply offset after metric computation.
if metric_spec.offset_window or metric_spec.offset_to_grain:
metric_time_dimension_specs = [
time_dimension_spec
for time_dimension_spec in queried_linkable_specs.time_dimension_specs
if time_dimension_spec.element_name == self._metric_time_dimension_reference.element_name
]
assert metric_time_dimension_specs, "Joining to time spine requires querying with metric time."
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
requested_metric_time_dimension_specs=metric_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
elif metric.type is MetricType.SIMPLE or MetricType.CUMULATIVE:
metric_input_measure_specs = self._metric_lookup.measures_for_metric(
metric_reference=metric_reference,
Expand Down Expand Up @@ -243,7 +261,7 @@ def _build_metrics_output_node(

assert compute_metrics_node is not None

output_nodes.append(compute_metrics_node)
output_nodes.append(join_to_time_spine_node or compute_metrics_node)

assert len(output_nodes) > 0, "ComputeMetricsNode was not properly constructed"

Expand Down
10 changes: 5 additions & 5 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,27 +675,27 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]: # noqa: D
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]: # noqa: D
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
"""Time range constraint to apply when querying time spine table."""
return self._time_range_constraint

@property
def offset_window(self) -> Optional[MetricTimeWindow]: # noqa: D
def offset_window(self) -> Optional[MetricTimeWindow]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_window

@property
def offset_to_grain(self) -> Optional[TimeGranularity]: # noqa: D
def offset_to_grain(self) -> Optional[TimeGranularity]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_to_grain

@property
def join_type(self) -> SqlJoinType: # noqa: D
def join_type(self) -> SqlJoinType:
"""Join type to use when joining to time spine."""
return self._join_type

Expand Down
26 changes: 26 additions & 0 deletions metricflow/test/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,29 @@ def test_dont_join_to_time_spine_if_no_time_dimension_requested( # noqa: D
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_nested_derived_metric_with_outer_offset( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(DataSet.metric_time_dimension_spec(TimeGranularity.DAY),),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,35 @@ metric:
- name: bookings
offset_window: 14 days
alias: bookings_2_weeks_ago
---
metric:
name: "bookings_offset_once"
description: bookings metric offset once.
type: derived
type_params:
expr: 2 * bookings
metrics:
- name: bookings
offset_window: 5 days
---
metric:
name: "bookings_offset_twice"
description: bookings metric offset twice.
type: derived
type_params:
expr: 2 * bookings_offset_once
metrics:
- name: bookings_offset_once
offset_window: 5 days
---
metric:
name: booking_fees_since_start_of_month
description: nested derived metric with offset and multiple input metrics
type: derived
type_params:
expr: booking_fees - booking_fees_start_of_month
metrics:
- name: booking_fees
offset_to_grain: month
alias: booking_fees_start_of_month
- name: booking_fees
90 changes: 90 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1468,3 +1468,93 @@ integration_test:
GROUP BY
COALESCE(subq_9.metric_time__day, subq_19.metric_time__day)
, COALESCE(subq_9.listing__is_lux_latest, subq_19.listing__is_lux_latest)
---
integration_test:
name: nested_derived_metric_outer_offset
description: Tests a nested derived metric where the outer metric has an input metric with offset_window.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
subq_9.ds AS metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM {{ source_schema }}.mf_time_spine subq_9
INNER JOIN (
SELECT
metric_time__day
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_1
ON
{{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
) subq_7
) subq_8
ON
{{ render_date_sub("subq_9", "ds", 5, TimeGranularity.DAY) }} = subq_8.metric_time__day
---
integration_test:
name: nested_derived_metric_outer_offset_multiple_input_metrics
description: Tests a nested derived metric where the outer metric has one input metric with offset_to_grain and another with no offset.
model: SIMPLE_MODEL
metrics: ["booking_fees_since_start_of_month"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
metric_time__day
, booking_fees - booking_fees_start_of_month AS booking_fees_since_start_of_month
FROM (
SELECT
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day) AS metric_time__day
, MAX(subq_8.booking_fees_start_of_month) AS booking_fees_start_of_month
, MAX(subq_14.booking_fees) AS booking_fees
FROM (
SELECT
subq_7.ds AS metric_time__day
, subq_5.booking_fees_start_of_month AS booking_fees_start_of_month
FROM {{ source_schema }}.mf_time_spine subq_7
INNER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees_start_of_month
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_4
) subq_5
ON
{{ render_date_trunc("subq_7.ds", TimeGranularity.MONTH) }} = subq_5.metric_time__day
) subq_8
FULL OUTER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_13
) subq_14
ON
subq_8.metric_time__day = subq_14.metric_time__day
GROUP BY
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day)
) subq_15
54 changes: 54 additions & 0 deletions metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
from typing import List

import pytest
Expand Down Expand Up @@ -1035,3 +1036,56 @@ def test_dimensions_requiring_join(
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


logger = logging.getLogger(__name__)


@pytest.mark.sql_engine_snapshot
def test_nested_offsets( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(MTD_SPEC_DAY,),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_derived_metric_with_offset_multiple_input_metrics( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="booking_fees_since_start_of_month"),),
time_dimension_specs=(MTD_SPEC_DAY,),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)
Loading

0 comments on commit a82ddea

Please sign in to comment.