Skip to content

Commit

Permalink
Bug fix: apply offset for nested derived metrics (#886)
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb authored Nov 21, 2023
1 parent 2128182 commit cef3330
Show file tree
Hide file tree
Showing 56 changed files with 10,767 additions and 29 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20231116-153955.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Apply time offset for nested dervied & ratio metrics
time: 2023-11-16T15:39:55.211195-08:00
custom:
Author: courtneyholcomb
Issue: "882"
53 changes: 30 additions & 23 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _build_derived_metric_output_node(
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
) -> ComputeMetricsNode:
) -> BaseOutput:
"""Builds a node to compute a metric defined from other metrics."""
metric = self._metric_lookup.get_metric(metric_spec.reference)
metric_input_specs = self._metric_lookup.metric_input_specs_for_metric(
Expand All @@ -230,17 +230,8 @@ def _build_derived_metric_output_node(
)

parent_nodes: List[BaseOutput] = []

metric_has_time_offset = bool(metric_spec.offset_window or metric_spec.offset_to_grain)
for metric_input_spec in metric_input_specs:
# TODO: See: https://github.com/dbt-labs/metricflow/issues/881
if (metric_spec.offset_window is not None or metric_spec.offset_to_grain is not None) and (
metric_input_spec.offset_window is not None or metric_input_spec.offset_to_grain is not None
):
raise NotImplementedError(
f"Multiple descendent metrics in a derived metric hierarchy are not yet supported. "
f"For {metric_spec}, the parent metric input is {metric_input_spec}"
)

parent_nodes.append(
self._build_any_metric_output_node(
metric_spec=MetricSpec(
Expand All @@ -251,29 +242,45 @@ def _build_derived_metric_output_node(
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=queried_linkable_specs,
where_constraint=where_constraint,
time_range_constraint=time_range_constraint,
# If metric is offset, we'll apply constraint after offset to avoid removing values unexpectedly.
where_constraint=where_constraint if not metric_has_time_offset else None,
time_range_constraint=time_range_constraint if not metric_has_time_offset else None,
)
)

if len(parent_nodes) == 1:
return ComputeMetricsNode(
parent_node=parent_nodes[0],
metric_specs=[metric_spec],
)

return ComputeMetricsNode(
parent_node=CombineAggregatedOutputsNode(parent_nodes=parent_nodes),
metric_specs=[metric_spec],
parent_node = (
parent_nodes[0] if len(parent_nodes) == 1 else CombineAggregatedOutputsNode(parent_nodes=parent_nodes)
)
output_node: BaseOutput = ComputeMetricsNode(parent_node=parent_node, metric_specs=[metric_spec])

# For nested ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
if metric_has_time_offset:
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric_time."
output_node = JoinToTimeSpineNode(
parent_node=output_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
if time_range_constraint:
output_node = ConstrainTimeRangeNode(
parent_node=output_node, time_range_constraint=time_range_constraint
)
if where_constraint:
output_node = WhereConstraintNode(parent_node=output_node, where_constraint=where_constraint)
return output_node

def _build_any_metric_output_node(
self,
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
) -> ComputeMetricsNode:
) -> BaseOutput:
"""Builds a node to compute a metric of any type."""
metric = self._metric_lookup.get_metric(metric_spec.reference)

Expand Down
12 changes: 6 additions & 6 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ def __init__(
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
) -> None: # noqa: D
) -> None:
"""Constructor.
Args:
Expand Down Expand Up @@ -679,27 +679,27 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]: # noqa: D
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]: # noqa: D
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
"""Time range constraint to apply when querying time spine table."""
return self._time_range_constraint

@property
def offset_window(self) -> Optional[MetricTimeWindow]: # noqa: D
def offset_window(self) -> Optional[MetricTimeWindow]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_window

@property
def offset_to_grain(self) -> Optional[TimeGranularity]: # noqa: D
def offset_to_grain(self) -> Optional[TimeGranularity]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_to_grain

@property
def join_type(self) -> SqlJoinType: # noqa: D
def join_type(self) -> SqlJoinType:
"""Join type to use when joining to time spine."""
return self._join_type

Expand Down
26 changes: 26 additions & 0 deletions metricflow/test/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,29 @@ def test_dont_join_to_time_spine_if_no_time_dimension_requested( # noqa: D
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_nested_derived_metric_with_outer_offset( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(DataSet.metric_time_dimension_spec(TimeGranularity.DAY),),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,35 @@ metric:
- name: bookings
offset_window: 14 days
alias: bookings_2_weeks_ago
---
metric:
name: "bookings_offset_once"
description: bookings metric offset once.
type: derived
type_params:
expr: 2 * bookings
metrics:
- name: bookings
offset_window: 5 days
---
metric:
name: "bookings_offset_twice"
description: bookings metric offset twice.
type: derived
type_params:
expr: 2 * bookings_offset_once
metrics:
- name: bookings_offset_once
offset_window: 2 days
---
metric:
name: booking_fees_since_start_of_month
description: nested derived metric with offset and multiple input metrics
type: derived
type_params:
expr: booking_fees - booking_fees_start_of_month
metrics:
- name: booking_fees
offset_to_grain: month
alias: booking_fees_start_of_month
- name: booking_fees
164 changes: 164 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1468,3 +1468,167 @@ integration_test:
GROUP BY
COALESCE(subq_9.metric_time__day, subq_19.metric_time__day)
, COALESCE(subq_9.listing__is_lux_latest, subq_19.listing__is_lux_latest)
---
integration_test:
name: nested_derived_metric_outer_offset
description: Tests a nested derived metric where the outer metric has an input metric with offset_window.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
subq_9.ds AS metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM {{ source_schema }}.mf_time_spine subq_9
INNER JOIN (
SELECT
metric_time__day
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_1
ON
{{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
) subq_7
) subq_8
ON
{{ render_date_sub("subq_9", "ds", 2, TimeGranularity.DAY) }} = subq_8.metric_time__day
---
integration_test:
name: nested_derived_metric_outer_offset_multiple_input_metrics
description: Tests a nested derived metric where the outer metric has one input metric with offset_to_grain and another with no offset.
model: SIMPLE_MODEL
metrics: ["booking_fees_since_start_of_month"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
metric_time__day
, booking_fees - booking_fees_start_of_month AS booking_fees_since_start_of_month
FROM (
SELECT
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day) AS metric_time__day
, MAX(subq_8.booking_fees_start_of_month) AS booking_fees_start_of_month
, MAX(subq_14.booking_fees) AS booking_fees
FROM (
SELECT
subq_7.ds AS metric_time__day
, subq_5.booking_fees_start_of_month AS booking_fees_start_of_month
FROM {{ source_schema }}.mf_time_spine subq_7
INNER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees_start_of_month
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_4
) subq_5
ON
{{ render_date_trunc("subq_7.ds", TimeGranularity.MONTH) }} = subq_5.metric_time__day
) subq_8
FULL OUTER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_13
) subq_14
ON
subq_8.metric_time__day = subq_14.metric_time__day
GROUP BY
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day)
) subq_15
---
integration_test:
name: nested_derived_metric_offset_with_where_constraint
description: Tests a nested derived metric where the outer metric has an offset and a constraint is applied to metric_time via where clause.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time", "grain": "day"}]
where_filter: "{{ render_time_constraint('metric_time__day', '2020-01-08', '2020-01-09') }}"
check_query: |
SELECT
subq_9.ds AS metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM {{ source_schema }}.mf_time_spine subq_9
INNER JOIN (
SELECT
metric_time__day
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_1
ON
{{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
) subq_7
) subq_8
ON
{{ render_date_sub("subq_9", "ds", 2, TimeGranularity.DAY) }} = subq_8.metric_time__day
WHERE {{ render_time_constraint('subq_9.ds', '2020-01-08', '2020-01-09') }}
---
integration_test:
name: nested_derived_metric_offset_with_time_constraint
description: Tests a nested derived metric where the outer metric has an offset and a constraint is applied to metric_time via time constraint params.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time", "grain": "day"}]
time_constraint: ["2020-01-08", "2020-01-09"]
check_query: |
SELECT
subq_9.ds AS metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM {{ source_schema }}.mf_time_spine subq_9
INNER JOIN (
SELECT
metric_time__day
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_1
ON
{{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
) subq_7
) subq_8
ON
{{ render_date_sub("subq_9", "ds", 2, TimeGranularity.DAY) }} = subq_8.metric_time__day
WHERE {{ render_time_constraint('subq_9.ds', '2020-01-08', '2020-01-09') }}
Loading

0 comments on commit cef3330

Please sign in to comment.