Skip to content

Commit

Permalink
Nested derived offset metrics: keep filter column until used
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 6, 2023
1 parent b73bf99 commit 8d9ce5c
Show file tree
Hide file tree
Showing 6 changed files with 516 additions and 1 deletion.
15 changes: 14 additions & 1 deletion metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ def _build_derived_metric_output_node(
f"{pformat_big_objects(metric_input_specs=metric_input_specs)}"
)

required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs, where_constraint=where_constraint
)

parent_nodes: List[BaseOutput] = []
metric_has_time_offset = bool(metric_spec.offset_window or metric_spec.offset_to_grain)
for metric_input_spec in metric_input_specs:
Expand All @@ -241,7 +245,9 @@ def _build_derived_metric_output_node(
offset_window=metric_input_spec.offset_window,
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=queried_linkable_specs,
queried_linkable_specs=queried_linkable_specs
if not metric_has_time_offset
else required_linkable_specs,
# If metric is offset, we'll apply constraint after offset to avoid removing values unexpectedly.
where_constraint=where_constraint if not metric_has_time_offset else None,
time_range_constraint=time_range_constraint if not metric_has_time_offset else None,
Expand Down Expand Up @@ -272,6 +278,13 @@ def _build_derived_metric_output_node(
)
if where_constraint:
output_node = WhereConstraintNode(parent_node=output_node, where_constraint=where_constraint)
if not extraneous_linkable_specs.is_subset_of(queried_linkable_specs):
output_node = FilterElementsNode(
parent_node=output_node,
include_specs=InstanceSpecSet(metric_specs=(metric_spec.without_offset(),)).merge(
queried_linkable_specs.as_spec_set
),
)
return output_node

def _build_any_metric_output_node(
Expand Down
4 changes: 4 additions & 0 deletions metricflow/specs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ def reference(self) -> MetricReference:
"""Return the reference object that is used for referencing the associated metric in the manifest."""
return MetricReference(element_name=self.element_name)

def without_offset(self) -> MetricSpec:
"""Represents the metric spec with any time offsets removed."""
return MetricSpec(element_name=self.element_name, constraint=self.constraint, alias=self.alias)


@dataclass(frozen=True)
class CumulativeMeasureDescription:
Expand Down
50 changes: 50 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1632,3 +1632,53 @@ integration_test:
ON
{{ render_date_sub("subq_9", "ds", 2, TimeGranularity.DAY) }} = subq_8.metric_time__day
WHERE {{ render_time_constraint('subq_9.ds', '2020-01-08', '2020-01-09') }}
---
integration_test:
name: nested_derived_metric_offset_with_joined_where_constraint_not_selected
description: Tests a nested derived metric where the outer metric has an offset and where constraint that requires an additional join, and is not used in the select statement.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time", "grain": "day"}]
where_filter: "{{ render_dimension_template('booking__is_instant') }}"
check_query: |
SELECT
metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM (
SELECT
metric_time__day
, bookings_offset_once
FROM (
SELECT
subq_10.ds AS metric_time__day
, subq_8.booking__is_instant AS booking__is_instant
, subq_8.bookings_offset_once AS bookings_offset_once
FROM {{ source_schema }}.mf_time_spine subq_10
INNER JOIN (
SELECT
metric_time__day
, booking__is_instant
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, subq_1.booking__is_instant AS booking__is_instant
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, is_instant AS booking__is_instant
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings
) subq_1
ON subq_3.ds - INTERVAL 5 day = subq_1.metric_time__day
GROUP BY
subq_3.ds
, subq_1.booking__is_instant
) subq_7
) subq_8
ON subq_10.ds - INTERVAL 2 day = subq_8.metric_time__day
) subq_11
WHERE booking__is_instant
)
31 changes: 31 additions & 0 deletions metricflow/test/query_rendering/test_derived_metric_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,34 @@ def test_nested_offsets_with_time_constraint( # noqa: D
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_derived_metric_offset_with_joined_where_constraint_not_selected( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
column_association_resolver: ColumnAssociationResolver,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(MTD_SPEC_DAY,),
where_constraint=WhereSpecFactory(
column_association_resolver=column_association_resolver,
).create_from_where_filter(
PydanticWhereFilter(where_sql_template=("{{ Dimension('booking__is_instant') }} "))
),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)
Loading

0 comments on commit 8d9ce5c

Please sign in to comment.