Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume new cumulative_type_params fields #1293

Merged
merged 13 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20240618-161241.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Consume cumulative-specific metric type params from new cumulative_type_params
field.
time: 2024-06-18T16:12:41.132445-07:00
custom:
Author: courtneyholcomb
Issue: "1293"
2 changes: 1 addition & 1 deletion extra-hatch-configuration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Jinja2>=3.1.3
dbt-semantic-interfaces==0.6.0.dev2
dbt-semantic-interfaces==0.6.1
more-itertools>=8.10.0, <10.2.0
pydantic>=1.10.0, <1.11.0
tabulate>=0.8.9
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# dbt Cloud depends on metricflow-semantics (dependency set in dbt-mantle), so DSI must always point to a production version here.
dbt-semantic-interfaces>=0.5.0, <2.0.0
dbt-semantic-interfaces>=0.6.1, <2.0.0 # Temp: pin to dev release to support cumulative_type_params in MF before they get released in prod.
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
graphviz>=0.18.2, <0.21
python-dateutil>=2.9.0, <2.10.0
rapidfuzz>=3.0, <4.0
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt_semantic_interfaces.transformations.convert_median import (
ConvertMedianToPercentileRule,
)
from dbt_semantic_interfaces.transformations.cumulative_type_params import SetCumulativeTypeParamsRule
from dbt_semantic_interfaces.transformations.names import LowerCaseNamesRule
from dbt_semantic_interfaces.transformations.proxy_measure import CreateProxyMeasureRule
from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import (
Expand All @@ -25,6 +26,7 @@ def parse_manifest_from_dbt_generated_manifest(manifest_json_string: str) -> Pyd
# The serialized object in the dbt project does not have all transformations applied to it at
# this time, which causes failures with input measure resolution.
# TODO: remove this transform call once the upstream changes are integrated into our dependency tree
# TODO: align rules between DSI, here, and MFS (if possible!)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow. These were all supposed to be removed ages ago but I think we just always applied them. I don't know if we can stop now, I guess we'll see.

We might want to move this to the CLI package in the future.

rules = (
# Primary
(LowerCaseNamesRule(),),
Expand All @@ -35,6 +37,7 @@ def parse_manifest_from_dbt_generated_manifest(manifest_json_string: str) -> Pyd
ConvertCountToSumRule(),
ConvertMedianToPercentileRule(),
DedupeMetricInputMeasuresRule(), # Remove once fix is in core
SetCumulativeTypeParamsRule(),
),
)
model = PydanticSemanticManifestTransformer.transform(raw_model, rules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ def _metric_requires_metric_time(self, metric: Metric) -> bool:
metrics_to_check = [metric]
while metrics_to_check:
metric_to_check = metrics_to_check.pop()
if metric_to_check.type_params.window is not None or metric_to_check.type_params.grain_to_date is not None:
if metric_to_check.type_params.cumulative_type_params and (
metric_to_check.type_params.cumulative_type_params.window is not None
or metric_to_check.type_params.cumulative_type_params.grain_to_date is not None
):
return True
for input_metric in metric_to_check.input_metrics:
if input_metric.offset_window is not None or input_metric.offset_to_grain is not None:
Expand Down Expand Up @@ -497,9 +500,11 @@ def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference
dimension_type=DimensionType.TIME,
entity_links=(),
join_path=SemanticModelJoinPath(
left_semantic_model_reference=measure_semantic_model.reference
if measure_semantic_model
else SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE,
left_semantic_model_reference=(
measure_semantic_model.reference
if measure_semantic_model
else SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE
),
),
# Anything that's not at the base time granularity of the measure's aggregation time dimension
# should be considered derived.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def validate_metric_in_resolution_dag(
elif metric.type is MetricType.CUMULATIVE:
if (
metric.type_params is not None
and (metric.type_params.window is not None or metric.type_params.grain_to_date is not None)
and metric.type_params.cumulative_type_params is not None
and (
metric.type_params.cumulative_type_params.window is not None
or metric.type_params.cumulative_type_params.grain_to_date is not None
)
and not query_includes_metric_time_or_agg_time_dimension
):
return MetricFlowQueryResolutionIssueSet.from_issue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ metric:
type: cumulative
type_params:
measure: monthly_measure_0
window: 2 months
cumulative_type_params:
window: 2 months
---
metric:
name: derived_metric_with_common_filtered_metric_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ metric:
type: cumulative
type_params:
measure: bookings
window: 7 days
cumulative_type_params:
window: 7 days
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ metric:
type: cumulative
type_params:
measure: bookings_monthly
window: 3 month
cumulative_type_params:
window: 3 month
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,18 @@ metric:
type: cumulative
type_params:
measure: txn_revenue
window: 2 month
cumulative_type_params:
window: 2 month
period_agg: average
---
metric:
name: "revenue_all_time"
description: "revenue_all_time"
type: cumulative
type_params:
measure: txn_revenue
cumulative_type_params:
period_agg: last
---
metric:
name: "every_two_days_bookers"
Expand All @@ -158,7 +162,8 @@ metric:
type: cumulative
type_params:
measure: txn_revenue
grain_to_date: month
cumulative_type_params:
grain_to_date: month
---
metric:
name: booking_fees
Expand Down Expand Up @@ -629,7 +634,8 @@ metric:
name: bookers
join_to_timespine: true
fill_nulls_with: 0
window: 2 days
cumulative_type_params:
window: 2 days
---
metric:
name: bookings_growth_2_weeks_fill_nulls_with_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@
type: cumulative
type_params:
measure: revenue
window: 7 days
cumulative_type_params:
window: 7 days
---
metric:
name: revenue_sub_10
Expand Down
12 changes: 10 additions & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,16 @@ def _build_base_metric_output_node(
child_metric_offset_to_grain=metric_spec.offset_to_grain,
cumulative_description=(
CumulativeMeasureDescription(
cumulative_window=metric.type_params.window,
cumulative_grain_to_date=metric.type_params.grain_to_date,
cumulative_window=(
metric.type_params.cumulative_type_params.window
if metric.type_params.cumulative_type_params
else None
),
cumulative_grain_to_date=(
metric.type_params.cumulative_type_params.grain_to_date
if metric.type_params.cumulative_type_params
else None
),
)
if metric.type is MetricType.CUMULATIVE
else None
Expand Down
16 changes: 9 additions & 7 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt_semantic_interfaces.references import EntityReference, MetricModelReference
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType
from dbt_semantic_interfaces.type_enums.conversion_calculation_type import ConversionCalculationType
from dbt_semantic_interfaces.type_enums.period_agg import PeriodAggregation
from dbt_semantic_interfaces.validations.unique_valid_name import MetricFlowReservedKeywords
from metricflow_semantics.aggregation_properties import AggregationState
from metricflow_semantics.dag.id_prefix import StaticIdPrefix
Expand Down Expand Up @@ -1635,13 +1636,14 @@ def visit_window_reaggregation_node(self, node: WindowReaggregationNode) -> SqlD
f"specs: {expected_specs}. Got: {parent_instance_set.as_tuple}."
)

# Pending DSI upgrade:
# sql_window_function = SqlWindowFunction[
# self._metric_lookup.get_metric(
# metric_instance.spec.reference
# ).type_params.cumulative_type_params.period_agg.name
# ]
sql_window_function = SqlWindowFunction.FIRST_VALUE # placeholder for now
cumulative_type_params = self._metric_lookup.get_metric(
metric_instance.spec.reference
).type_params.cumulative_type_params
sql_window_function = SqlWindowFunction.get_window_function_for_period_agg(
cumulative_type_params.period_agg
if cumulative_type_params and cumulative_type_params.period_agg
else PeriodAggregation.FIRST
)
order_by_args = []
if sql_window_function.requires_ordering:
order_by_args.append(
Expand Down
13 changes: 13 additions & 0 deletions metricflow/sql/sql_exprs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dbt_semantic_interfaces.protocols.measure import MeasureAggregationParameters
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType
from dbt_semantic_interfaces.type_enums.date_part import DatePart
from dbt_semantic_interfaces.type_enums.period_agg import PeriodAggregation
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DagNode, DisplayedProperty, NodeId
Expand Down Expand Up @@ -967,6 +968,18 @@ def requires_ordering(self) -> bool:
else:
assert_values_exhausted(self)

@classmethod
def get_window_function_for_period_agg(cls, period_agg: PeriodAggregation) -> SqlWindowFunction:
"""Get the window function to use for given period agg option."""
if period_agg is PeriodAggregation.FIRST:
return cls.FIRST_VALUE
elif period_agg is PeriodAggregation.LAST:
return cls.LAST_VALUE
elif period_agg is PeriodAggregation.AVERAGE:
return cls.AVERAGE
else:
assert_values_exhausted(period_agg)


@dataclass(frozen=True)
class SqlWindowOrderByArgument:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ integration_test:
SELECT
metric_time__week
, metric_time__quarter
, FIRST_VALUE(revenue_all_time) OVER (
, LAST_VALUE(revenue_all_time) OVER (
PARTITION BY metric_time__week, metric_time__quarter
ORDER BY metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Expand Down Expand Up @@ -467,7 +467,7 @@ integration_test:
SELECT
revenue_instance__ds__month
, metric_time__week
, FIRST_VALUE(trailing_2_months_revenue) OVER (
, AVG(trailing_2_months_revenue) OVER (
PARTITION BY revenue_instance__ds__month, metric_time__week
ORDER BY metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Expand Down Expand Up @@ -562,7 +562,7 @@ integration_test:
SELECT
metric_time__month
, metric_time__year
, FIRST_VALUE(t2mr) OVER (
, AVG(t2mr) OVER (
PARTITION BY metric_time__month, metric_time__year
ORDER BY metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,6 @@ def test_derived_cumulative_metric_with_non_default_grains(


# TODO: write the following tests when unblocked
# - Render each of the allowed period_aggs (both set in YAML & default)
# - Query cumulative metric with non-day default_grain (using default grain and non-default grain)
# - Query 2 metrics with different default_grains using metric_time (no grain specified)
# - If default grain is WEEK, query with a higher grain (check that we still get correct values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FROM (
SELECT
subq_7.metric_time__week
, subq_7.metric_time__quarter
, FIRST_VALUE(subq_7.revenue_all_time) OVER (
, LAST_VALUE(subq_7.revenue_all_time) OVER (
PARTITION BY
subq_7.metric_time__week
, subq_7.metric_time__quarter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FROM (
SELECT
metric_time__week
, metric_time__quarter
, FIRST_VALUE(revenue_all_time) OVER (
, LAST_VALUE(revenue_all_time) OVER (
PARTITION BY
metric_time__week
, metric_time__quarter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Window Function for Metric Re-aggregation
SELECT
subq_7.metric_time__week
, FIRST_VALUE(subq_7.revenue_all_time) OVER (
, LAST_VALUE(subq_7.revenue_all_time) OVER (
PARTITION BY subq_7.metric_time__week
ORDER BY subq_7.metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Window Function for Metric Re-aggregation
SELECT
metric_time__week
, FIRST_VALUE(revenue_all_time) OVER (
, LAST_VALUE(revenue_all_time) OVER (
PARTITION BY metric_time__week
ORDER BY metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ FROM (
-- Window Function for Metric Re-aggregation
SELECT
subq_7.metric_time__week
, FIRST_VALUE(subq_7.t2mr) OVER (
PARTITION BY subq_7.metric_time__week
ORDER BY subq_7.metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS t2mr
, AVG(subq_7.t2mr) OVER (PARTITION BY subq_7.metric_time__week) AS t2mr
FROM (
-- Compute Metrics via Expressions
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@ FROM (
-- Window Function for Metric Re-aggregation
SELECT
metric_time__week
, FIRST_VALUE(t2mr) OVER (
PARTITION BY metric_time__week
ORDER BY metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS t2mr
, AVG(t2mr) OVER (PARTITION BY metric_time__week) AS t2mr
FROM (
-- Join Self Over Time Range
-- Pass Only Elements: ['txn_revenue', 'metric_time__week', 'metric_time__day']
-- Aggregate Measures
-- Compute Metrics via Expressions
SELECT
subq_12.metric_time__day AS metric_time__day
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this doesn't look right. We're now doing the initial aggregation against WEEK instead of DAY, and then taking the average of those grouped by week, which is effectively a no-op. Is that expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoot, you're right. I wonder if the column pruner got to it - investigating 🧐

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely seems like an optimizer issue since the non-optimized SQL looks right.

Copy link
Contributor Author

@courtneyholcomb courtneyholcomb Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tlento fixed this bug in the latest 3 commits.

, subq_12.metric_time__week AS metric_time__week
subq_12.metric_time__week AS metric_time__week
, SUM(revenue_src_28000.revenue) AS t2mr
FROM (
-- Time Spine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ FROM (
-- Window Function for Metric Re-aggregation
SELECT
subq_7.metric_time__year
, FIRST_VALUE(subq_7.trailing_2_months_revenue) OVER (
PARTITION BY subq_7.metric_time__year
ORDER BY subq_7.metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS trailing_2_months_revenue
, AVG(subq_7.trailing_2_months_revenue) OVER (PARTITION BY subq_7.metric_time__year) AS trailing_2_months_revenue
FROM (
-- Compute Metrics via Expressions
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@ FROM (
-- Window Function for Metric Re-aggregation
SELECT
metric_time__year
, FIRST_VALUE(trailing_2_months_revenue) OVER (
PARTITION BY metric_time__year
ORDER BY metric_time__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS trailing_2_months_revenue
, AVG(trailing_2_months_revenue) OVER (PARTITION BY metric_time__year) AS trailing_2_months_revenue
FROM (
-- Join Self Over Time Range
-- Pass Only Elements: ['txn_revenue', 'metric_time__year', 'metric_time__day']
-- Aggregate Measures
-- Compute Metrics via Expressions
SELECT
subq_11.metric_time__day AS metric_time__day
, subq_11.metric_time__year AS metric_time__year
subq_11.metric_time__year AS metric_time__year
, SUM(revenue_src_28000.revenue) AS trailing_2_months_revenue
FROM (
-- Time Spine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FROM (
SELECT
subq_7.metric_time__week
, subq_7.metric_time__quarter
, FIRST_VALUE(subq_7.revenue_all_time) OVER (
, LAST_VALUE(subq_7.revenue_all_time) OVER (
PARTITION BY
subq_7.metric_time__week
, subq_7.metric_time__quarter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FROM (
SELECT
metric_time__week
, metric_time__quarter
, FIRST_VALUE(revenue_all_time) OVER (
, LAST_VALUE(revenue_all_time) OVER (
PARTITION BY
metric_time__week
, metric_time__quarter
Expand Down
Loading
Loading