Skip to content

Commit

Permalink
Bug fix: apply offset for nested derived metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 17, 2023
1 parent 4b2604d commit 2704a05
Show file tree
Hide file tree
Showing 11 changed files with 1,250 additions and 25 deletions.
39 changes: 20 additions & 19 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _build_derived_metric_output_node(
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
) -> ComputeMetricsNode:
) -> BaseOutput:
"""Builds a node to compute a metric defined from other metrics."""
metric = self._metric_lookup.get_metric(metric_spec.reference)
metric_input_specs = self._metric_lookup.metric_input_specs_for_metric(
Expand All @@ -232,15 +232,6 @@ def _build_derived_metric_output_node(
parent_nodes: List[BaseOutput] = []

for metric_input_spec in metric_input_specs:
# TODO: See: https://github.com/dbt-labs/metricflow/issues/881
if (metric_spec.offset_to_grain is not None or metric_spec.offset_to_grain is not None) and (
metric_input_spec.offset_window is not None or metric_input_spec.offset_to_grain is not None
):
raise NotImplementedError(
f"Multiple descendent metrics in a derived metric hierarchy are not yet supported. "
f"For {metric_spec}, the parent metric input is {metric_input_spec}"
)

parent_nodes.append(
self._build_any_metric_output_node(
metric_spec=MetricSpec(
Expand All @@ -256,24 +247,34 @@ def _build_derived_metric_output_node(
)
)

if len(parent_nodes) == 1:
return ComputeMetricsNode(
parent_node=parent_nodes[0],
metric_specs=[metric_spec],
)

return ComputeMetricsNode(
parent_node=CombineMetricsNode(parent_nodes=parent_nodes),
compute_metrics_node = ComputeMetricsNode(
parent_node=parent_nodes[0] if len(parent_nodes) == 1 else CombineMetricsNode(parent_nodes=parent_nodes),
metric_specs=[metric_spec],
)

# For nested ratio / derived metrics with time offset, apply offset after metric computation.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if metric_spec.offset_window or metric_spec.offset_to_grain:
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric_time."
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
return join_to_time_spine_node or compute_metrics_node

def _build_any_metric_output_node(
self,
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
) -> ComputeMetricsNode:
) -> BaseOutput:
"""Builds a node to compute a metric of any type."""
metric = self._metric_lookup.get_metric(metric_spec.reference)

Expand Down
12 changes: 6 additions & 6 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ def __init__(
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
) -> None: # noqa: D
) -> None:
"""Constructor.
Args:
Expand Down Expand Up @@ -679,27 +679,27 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]: # noqa: D
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]: # noqa: D
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
"""Time range constraint to apply when querying time spine table."""
return self._time_range_constraint

@property
def offset_window(self) -> Optional[MetricTimeWindow]: # noqa: D
def offset_window(self) -> Optional[MetricTimeWindow]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_window

@property
def offset_to_grain(self) -> Optional[TimeGranularity]: # noqa: D
def offset_to_grain(self) -> Optional[TimeGranularity]:
"""Time range constraint to apply when querying time spine table."""
return self._offset_to_grain

@property
def join_type(self) -> SqlJoinType: # noqa: D
def join_type(self) -> SqlJoinType:
"""Join type to use when joining to time spine."""
return self._join_type

Expand Down
26 changes: 26 additions & 0 deletions metricflow/test/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,29 @@ def test_dont_join_to_time_spine_if_no_time_dimension_requested( # noqa: D
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_nested_derived_metric_with_outer_offset( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(DataSet.metric_time_dimension_spec(TimeGranularity.DAY),),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,35 @@ metric:
- name: bookings
offset_window: 14 days
alias: bookings_2_weeks_ago
---
metric:
name: "bookings_offset_once"
description: bookings metric offset once.
type: derived
type_params:
expr: 2 * bookings
metrics:
- name: bookings
offset_window: 5 days
---
metric:
name: "bookings_offset_twice"
description: bookings metric offset twice.
type: derived
type_params:
expr: 2 * bookings_offset_once
metrics:
- name: bookings_offset_once
offset_window: 5 days
---
metric:
name: booking_fees_since_start_of_month
description: nested derived metric with offset and multiple input metrics
type: derived
type_params:
expr: booking_fees - booking_fees_start_of_month
metrics:
- name: booking_fees
offset_to_grain: month
alias: booking_fees_start_of_month
- name: booking_fees
90 changes: 90 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1468,3 +1468,93 @@ integration_test:
GROUP BY
COALESCE(subq_9.metric_time__day, subq_19.metric_time__day)
, COALESCE(subq_9.listing__is_lux_latest, subq_19.listing__is_lux_latest)
---
integration_test:
name: nested_derived_metric_outer_offset
description: Tests a nested derived metric where the outer metric has an input metric with offset_window.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
subq_9.ds AS metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM {{ source_schema }}.mf_time_spine subq_9
INNER JOIN (
SELECT
metric_time__day
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_1
ON
{{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
) subq_7
) subq_8
ON
{{ render_date_sub("subq_9", "ds", 5, TimeGranularity.DAY) }} = subq_8.metric_time__day
---
integration_test:
name: nested_derived_metric_outer_offset_multiple_input_metrics
description: Tests a nested derived metric where the outer metric has one input metric with offset_to_grain and another with no offset.
model: SIMPLE_MODEL
metrics: ["booking_fees_since_start_of_month"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
metric_time__day
, booking_fees - booking_fees_start_of_month AS booking_fees_since_start_of_month
FROM (
SELECT
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day) AS metric_time__day
, MAX(subq_8.booking_fees_start_of_month) AS booking_fees_start_of_month
, MAX(subq_14.booking_fees) AS booking_fees
FROM (
SELECT
subq_7.ds AS metric_time__day
, subq_5.booking_fees_start_of_month AS booking_fees_start_of_month
FROM {{ source_schema }}.mf_time_spine subq_7
INNER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees_start_of_month
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_4
) subq_5
ON
{{ render_date_trunc("subq_7.ds", TimeGranularity.MONTH) }} = subq_5.metric_time__day
) subq_8
FULL OUTER JOIN (
SELECT
metric_time__day
, booking_value * 0.05 AS booking_fees
FROM (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY
{{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_13
) subq_14
ON
subq_8.metric_time__day = subq_14.metric_time__day
GROUP BY
COALESCE(subq_8.metric_time__day, subq_14.metric_time__day)
) subq_15
50 changes: 50 additions & 0 deletions metricflow/test/query_rendering/test_derived_metric_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,53 @@ def test_derived_offset_cumulative_metric( # noqa: D
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_offsets( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(MTD_SPEC_DAY,),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_derived_metric_with_offset_multiple_input_metrics( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="booking_fees_since_start_of_month"),),
time_dimension_specs=(MTD_SPEC_DAY,),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)
Loading

0 comments on commit 2704a05

Please sign in to comment.