Skip to content

Commit

Permalink
Bug fix: selective where filters after join to time spine (#1120)
Browse files Browse the repository at this point in the history
When an input metric specifies `join_to_timespine: true` in its YAML
definition, we apply special behavior for filters. Before this PR, we
were applying all filters twice - before and after the time spine join.
This is because you might end up with new rows after the time spine join
that should be filtered out. This resulted in a bug sometimes if you
filtered by a spec that was not in the group by, since the filter column
no longer existed at that level of the query.
This fixes that bug by only applying the post-join filter if the
filtered spec is also in the group by. The specs in the group by should
be the only ones changed after the time spine join, anyway.
  • Loading branch information
courtneyholcomb authored May 23, 2024
1 parent 4f7dc28 commit 6758d03
Show file tree
Hide file tree
Showing 60 changed files with 9,117 additions and 7 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240521-165853.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: For metrics that join to time spine, apply post-join filters only for specs
that are in the group by.
time: 2024-05-21T16:58:53.09277-07:00
custom:
Author: courtneyholcomb
Issue: "1119"
21 changes: 17 additions & 4 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,10 +1452,23 @@ def _build_aggregated_measure_from_measure_source_node(
offset_window=after_aggregation_time_spine_join_description.offset_window,
offset_to_grain=after_aggregation_time_spine_join_description.offset_to_grain,
)
# Since new rows might have been added due to time spine join, apply constraints again here.
if len(metric_input_measure_spec.filter_specs) > 0:
output_node = WhereConstraintNode(parent_node=output_node, where_constraint=merged_where_filter_spec)
if time_range_constraint is not None:

# Since new rows might have been added due to time spine join, re-apply constraints here. Only re-apply filters
# for specs that are also in the queried specs, since those are the only ones that might have changed after the
# time spine join.
queried_filter_specs = [
filter_spec
for filter_spec in metric_input_measure_spec.filter_specs
if set(filter_spec.linkable_specs).issubset(set(queried_linkable_specs.as_tuple))
]
if len(queried_filter_specs) > 0:
output_node = WhereConstraintNode(
parent_node=output_node, where_constraint=WhereFilterSpec.merge_iterable(queried_filter_specs)
)

# TODO: this will break if you query by agg_time_dimension but apply a time constraint on metric_time.
# To fix when enabling time range constraints for agg_time_dimension.
if queried_agg_time_dimension_specs and time_range_constraint is not None:
output_node = ConstrainTimeRangeNode(
parent_node=output_node, time_range_constraint=time_range_constraint
)
Expand Down
79 changes: 79 additions & 0 deletions tests_metricflow/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2131,3 +2131,82 @@ integration_test:
) metric_subquery
ON t.customer_third_hop_id = metric_subquery.customer_third_hop_id
WHERE txn_count > 0
---
integration_test:
name: simple_join_to_time_spine_with_filter
description: Test a simple metric that joins to time spine, but doesn't fill nulls, filtered by a dimension that's not in the group by. Should apply constraint only once.
model: SIMPLE_MODEL
metrics: ["bookings_join_to_time_spine"]
group_by_objs: [{"name": "metric_time"}]
where_filter: "{{ render_dimension_template('booking__is_instant') }}"
check_query: |
SELECT
b.ds AS metric_time__day
, a.bookings AS bookings_join_to_time_spine
FROM {{ source_schema }}.mf_time_spine b
LEFT JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS ds
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings
WHERE is_instant
GROUP BY {{ render_date_trunc("ds", TimeGranularity.DAY) }}
) a ON b.ds = a.ds
---
integration_test:
name: simple_join_to_time_spine_with_queried_filter
description: Test a simple metric that joins to time spine, but doesn't fill nulls, filtered by a dimension that is in the group by. Should apply constraint twice.
model: SIMPLE_MODEL
metrics: ["bookings_join_to_time_spine"]
group_by_objs: [{"name": "metric_time"}, {"name": "booking__is_instant"}]
where_filter: "{{ render_dimension_template('booking__is_instant') }}"
check_query: |
SELECT
b.ds AS metric_time__day
, a.is_instant AS booking__is_instant
, a.bookings AS bookings_join_to_time_spine
FROM {{ source_schema }}.mf_time_spine b
LEFT JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS ds
, is_instant
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings
WHERE is_instant
GROUP BY {{ render_date_trunc("ds", TimeGranularity.DAY) }}, is_instant
) a ON b.ds = a.ds
WHERE is_instant
---
integration_test:
name: join_to_time_spine_with_time_constraint
description: Test join to time spine with time constraint that isn't in group by. Should skip both time spine join and second constraint.
model: SIMPLE_MODEL
metrics: ["bookings_join_to_time_spine"]
time_constraint: ["2020-01-03", "2020-01-03"]
check_query: |
SELECT
SUM(1) AS bookings_join_to_time_spine
FROM {{ source_schema }}.fct_bookings
WHERE {{ render_between_time_constraint("ds", "2020-01-03", "2020-01-03") }}
---
integration_test:
name: join_to_time_spine_with_queried_time_constraint
description: Test join to time spine with time constraint that is in group by. Should apply constraint twice.
model: SIMPLE_MODEL
metrics: ["bookings_join_to_time_spine"]
group_by_objs: [{"name": "metric_time"}]
time_constraint: ["2020-01-03", "2020-01-03"]
check_query: |
SELECT
b.ds AS metric_time__day
, a.bookings AS bookings_join_to_time_spine
FROM {{ source_schema }}.mf_time_spine b
LEFT JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS ds
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings
WHERE {{ render_between_time_constraint("ds", "2020-01-03", "2020-01-03") }}
GROUP BY {{ render_date_trunc("ds", TimeGranularity.DAY) }}
) a ON b.ds = a.ds
WHERE {{ render_between_time_constraint("b.ds", "2020-01-03", "2020-01-03") }}
113 changes: 110 additions & 3 deletions tests_metricflow/query_rendering/test_time_spine_join_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

from __future__ import annotations

import datetime

import pytest
from _pytest.fixtures import FixtureRequest
from dbt_semantic_interfaces.implementations.filters.where_filter import PydanticWhereFilter
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from metricflow_semantics.query.query_parser import MetricFlowQueryParser
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.spec_classes import (
MetricSpec,
)
from metricflow_semantics.specs.spec_classes import MetricSpec
from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration

from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder
Expand Down Expand Up @@ -46,3 +48,108 @@ def test_simple_join_to_time_spine( # noqa: D103
sql_client=sql_client,
node=dataflow_plan.sink_node,
)


@pytest.mark.sql_engine_snapshot
def test_simple_join_to_time_spine_with_filter(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
"""Test case where metric fills nulls and filter is not in group by. Should apply constraint once."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_fill_nulls_with_0",),
group_by_names=("metric_time__day",),
where_constraint=PydanticWhereFilter(where_sql_template="{{ Dimension('booking__is_instant') }}"),
).query_spec
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

convert_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
)


@pytest.mark.sql_engine_snapshot
def test_simple_join_to_time_spine_with_queried_filter(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
"""Test case where metric fills nulls and filter is in group by. Should apply constraint twice."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_fill_nulls_with_0",),
group_by_names=("metric_time__day", "booking__is_instant"),
where_constraint=PydanticWhereFilter(where_sql_template="{{ Dimension('booking__is_instant') }}"),
).query_spec
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

convert_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
)


@pytest.mark.sql_engine_snapshot
def test_join_to_time_spine_with_time_constraint(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
"""Test case where metric that fills nulls is queried with a time constraint. Should apply constraint once."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_fill_nulls_with_0",),
time_constraint_start=datetime.datetime(2020, 1, 3),
time_constraint_end=datetime.datetime(2020, 1, 5),
).query_spec
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

convert_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
)


@pytest.mark.sql_engine_snapshot
def test_join_to_time_spine_with_queried_time_constraint(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
"""Test case where metric that fills nulls is queried with metric time and a time constraint. Should apply constraint twice."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_fill_nulls_with_0",),
group_by_names=("metric_time__day",),
time_constraint_start=datetime.datetime(2020, 1, 3),
time_constraint_end=datetime.datetime(2020, 1, 5),
).query_spec
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

convert_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
)
Loading

0 comments on commit 6758d03

Please sign in to comment.