Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 18, 2024
1 parent e91fd30 commit f5b1e3a
Show file tree
Hide file tree
Showing 10 changed files with 1,495 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -860,3 +860,24 @@ metric:
- name: instant_bookings
alias: shared_alias
---
metric:
name: bookings_offset_one_martian_day
description: bookings offset by one martian_day
type: derived
type_params:
expr: bookings
metrics:
- name: bookings
offset_window: 1 martian_day
---
metric:
name: bookings_martian_day_over_martian_day
description: bookings growth martian day over martian day
type: derived
type_params:
expr: bookings - bookings_offset / NULLIF(bookings_offset, 0)
metrics:
- name: bookings
offset_window: 1 martian_day
alias: bookings_offset
- name: bookings
8 changes: 7 additions & 1 deletion metricflow/sql/optimizer/rewriting_sub_query_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,13 @@ def _rewrite_node_with_join(self, node: SqlSelectStatementNode) -> SqlSelectStat
join_select_node = join_desc.right_source.as_select_node

# Verifying that it's simple makes it easier to reason about the logic.
if not join_select_node or not SqlRewritingSubQueryReducerVisitor._is_simple_source(join_select_node):
if (
not join_select_node
or not SqlRewritingSubQueryReducerVisitor._is_simple_source(join_select_node)
# TODO: need something better here. Also don't reduce case statements? (not in joined logic)
# This logic might not be right anyway. Figure that out.
# or any(col.expr.as_window_function_expression for col in join_select_node.select_columns)
):
new_join_descs.append(join_desc)
continue

Expand Down
161 changes: 161 additions & 0 deletions tests_metricflow/integration/test_cases/itest_granularity.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -961,3 +961,164 @@ integration_test:
GROUP BY subq_2.martian_day
) subq_5
ON subq_6.metric_time__martian_day = subq_5.metric_time__martian_day
---
integration_test:
name: custom_offset_window
description: Test querying a metric with a custom offset window
model: SIMPLE_MODEL
metrics: ["bookings_offset_one_martian_day"]
group_bys: ["metric_time__day"]
check_query: |
WITH cte AS (
SELECT
martian_day AS ds__martian_day
, FIRST_VALUE(ds) OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__first_value
, LAST_VALUE(ds) OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__last_value
, ROW_NUMBER() OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__day__row_number
FROM {{ source_schema }}.mf_time_spine ts
)
SELECT
subq_10.metric_time__day
, SUM(1) AS bookings_offset_one_martian_day
FROM (
SELECT
CASE
WHEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day <= subq_7.ds__martian_day__last_value__offset
THEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day
ELSE subq_7.ds__martian_day__last_value__offset
END AS metric_time__day
FROM cte
INNER JOIN (
SELECT
ds__martian_day
, LAG(ds__martian_day__first_value, 1) OVER (
ORDER BY ds__martian_day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__first_value__offset
, LAG(ds__martian_day__last_value, 1) OVER (
ORDER BY ds__martian_day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__last_value__offset
FROM (
SELECT
ds__martian_day__first_value
, ds__martian_day__last_value
, ds__martian_day
FROM cte
GROUP BY
ds__martian_day__first_value
, ds__martian_day__last_value
, ds__martian_day
) subq_5
) subq_7
ON cte.ds__martian_day = subq_7.ds__martian_day
) subq_10
INNER JOIN {{ source_schema }}.fct_bookings b ON subq_10.metric_time__day = {{ render_date_trunc("b.ds", TimeGranularity.DAY) }}
GROUP BY subq_10.metric_time__day
---
integration_test:
name: custom_offset_window_with_grain_and_date_part
description: Test querying a metric with a custom offset window
model: SIMPLE_MODEL
metrics: ["bookings_offset_one_martian_day"]
group_by_objs: [{"name": "booking__ds", "grain": "week"}, {"name": "metric_time", "date_part": "month"}, {"name": "booking__ds", "grain": "martian_day"}]
check_query: |
WITH cte AS (
SELECT
martian_day AS ds__martian_day
, FIRST_VALUE(ds) OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__first_value
, LAST_VALUE(ds) OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__last_value
, ROW_NUMBER() OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__day__row_number
FROM {{ source_schema }}.mf_time_spine ts
)
SELECT
subq_11.martian_day AS booking__ds__martian_day
, subq_10.booking__ds__week
, subq_10.metric_time__extract_month
, SUM(1) AS bookings_offset_one_martian_day
FROM (
SELECT
CASE
WHEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day <= subq_7.ds__martian_day__last_value__offset
THEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day
ELSE subq_7.ds__martian_day__last_value__offset
END AS metric_time__day
, CASE
WHEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day <= subq_7.ds__martian_day__last_value__offset
THEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day
ELSE subq_7.ds__martian_day__last_value__offset
END AS booking__ds__day
, {{ render_date_trunc(
"""CASE
WHEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day <= subq_7.ds__martian_day__last_value__offset
THEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day
ELSE subq_7.ds__martian_day__last_value__offset
END"""
, TimeGranularity.WEEK
) }} AS booking__ds__week
, {{ render_extract(
"""CASE
WHEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day <= subq_7.ds__martian_day__last_value__offset
THEN subq_7.ds__martian_day__first_value__offset + INTERVAL (cte.ds__day__row_number - 1) day
ELSE subq_7.ds__martian_day__last_value__offset
END"""
, DatePart.MONTH
) }} AS metric_time__extract_month
FROM cte
INNER JOIN (
SELECT
ds__martian_day
, LAG(ds__martian_day__first_value, 1) OVER (
ORDER BY ds__martian_day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__first_value__offset
, LAG(ds__martian_day__last_value, 1) OVER (
ORDER BY ds__martian_day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__last_value__offset
FROM (
SELECT
ds__martian_day__first_value
, ds__martian_day__last_value
, ds__martian_day
FROM cte
GROUP BY
ds__martian_day__first_value
, ds__martian_day__last_value
, ds__martian_day
) subq_5
) subq_7
ON cte.ds__martian_day = subq_7.ds__martian_day
) subq_10
INNER JOIN {{ source_schema }}.fct_bookings b ON subq_10.metric_time__day = {{ render_date_trunc("b.ds", TimeGranularity.DAY) }}
LEFT OUTER JOIN {{ source_schema }}.mf_time_spine subq_11 ON subq_10.booking__ds__day = subq_11.ds
GROUP BY
subq_11.martian_day
, subq_10.booking__ds__week
, subq_10.metric_time__extract_month
1 change: 1 addition & 0 deletions tests_metricflow/integration/test_configured_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def test_case(
)

actual = query_result.result_df
# assert 0, query_result.sql

expected = sql_client.query(
jinja2.Template(
Expand Down
56 changes: 56 additions & 0 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from _pytest.fixtures import FixtureRequest
from dbt_semantic_interfaces.implementations.filters.where_filter import PydanticWhereFilter
from dbt_semantic_interfaces.references import EntityReference
from dbt_semantic_interfaces.type_enums.date_part import DatePart
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from metricflow_semantics.query.query_parser import MetricFlowQueryParser
from metricflow_semantics.specs.metric_spec import MetricSpec
from metricflow_semantics.specs.query_param_implementations import TimeDimensionParameter
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration
Expand Down Expand Up @@ -610,3 +612,57 @@ def test_join_to_timespine_metric_with_custom_granularity_filter_not_in_group_by
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("metric_time__day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


# TODO: prevent optimizer from collapsing case statement?
# TODO: get rid of second day column
@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_granularity_and_date_part( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by=(
TimeDimensionParameter(name="booking__ds", grain=TimeGranularity.MONTH.name),
TimeDimensionParameter(name="metric_time", date_part=DatePart.YEAR),
TimeDimensionParameter(name="metric_time", grain="martian_day"),
),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)
Loading

0 comments on commit f5b1e3a

Please sign in to comment.