Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill nulls for multi-metric queries #850

Merged
merged 9 commits into from
Nov 8, 2023
45 changes: 34 additions & 11 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import logging
from collections import OrderedDict
from typing import List, Optional, Sequence, Union
from typing import List, Optional, Sequence, Tuple, Union

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.protocols.metric import MetricInputMeasure, MetricType
from dbt_semantic_interfaces.references import MetricModelReference
from dbt_semantic_interfaces.references import MetricModelReference, MetricReference
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType

from metricflow.aggregation_properties import AggregationState
Expand Down Expand Up @@ -862,24 +862,25 @@ def visit_where_constraint_node(self, node: WhereConstraintNode) -> SqlDataSet:
),
)

def _make_select_columns_for_metrics(
def _make_select_columns_for_multiple_metrics(
self,
table_alias_to_metric_specs: OrderedDict[str, Sequence[MetricSpec]],
table_alias_to_metric_instances: OrderedDict[str, Tuple[MetricInstance, ...]],
aggregation_type: Optional[AggregationType],
) -> List[SqlSelectColumn]:
"""Creates select columns that get the given metric using the given table alias.

e.g.

with table_alias_to_metric_specs = {"a": MetricSpec(element_name="bookings")}
with table_alias_to_metric_instances = {"a": MetricSpec(element_name="bookings")}

->

a.bookings AS bookings
"""
select_columns = []
for table_alias, metric_specs in table_alias_to_metric_specs.items():
for metric_spec in metric_specs:
for table_alias, metric_instances in table_alias_to_metric_instances.items():
for metric_instance in metric_instances:
metric_spec = metric_instance.spec
metric_column_name = self._column_association_resolver.resolve_spec(metric_spec).column_name
column_reference_expression = SqlColumnReferenceExpression(
col_ref=SqlColumnReference(
Expand All @@ -894,6 +895,27 @@ def _make_select_columns_for_metrics(
else:
select_expression = column_reference_expression

# At this point, the MetricSpec might have the alias in place of the element name, so we need to look
# back at where it was defined from to get the metric element name.
metric_name = metric_instance.defined_from.metric_name
input_measures = self._metric_lookup.measures_for_metric(
metric_reference=MetricReference(metric_name),
column_association_resolver=self._column_association_resolver,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to deal with the issue below is to add a different, internal API here that only gives the measures that are direct inputs into the metric. That way, if we add another metric type that takes measures as input anything derived from it won't have anything to do here, and we ensure we only do the coalesce once within a derived metric chain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea! I'll add that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added that!

# If multiple input measures, this is a query with a nested derived/ratio metric. In that case, we can
# skip this step because it has already occurred when rendering the nested metric.
# TODO: this logic might need updating for conversion metrics, which will allow multiple input measures
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
if len(input_measures) == 1:
input_measure = input_measures[0]
if input_measure.fill_nulls_with is not None:
select_expression = SqlAggregateFunctionExpression(
sql_function=SqlFunction.COALESCE,
sql_function_args=[
select_expression,
SqlStringExpression(str(input_measure.fill_nulls_with)),
],
)

select_columns.append(
SqlSelectColumn(
expr=select_expression,
Expand Down Expand Up @@ -938,13 +960,13 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
), "Shouldn't have a CombineMetricsNode in the dataflow plan if there's only 1 parent."

parent_data_sets: List[AnnotatedSqlDataSet] = []
table_alias_to_metric_specs: OrderedDict[str, Sequence[MetricSpec]] = OrderedDict()
table_alias_to_metric_instances: OrderedDict[str, Tuple[MetricInstance, ...]] = OrderedDict()

for parent_node in node.parent_nodes:
parent_sql_data_set = parent_node.accept(self)
table_alias = self._next_unique_table_alias()
parent_data_sets.append(AnnotatedSqlDataSet(data_set=parent_sql_data_set, alias=table_alias))
table_alias_to_metric_specs[table_alias] = parent_sql_data_set.instance_set.spec_set.metric_specs
table_alias_to_metric_instances[table_alias] = parent_sql_data_set.instance_set.metric_instances

# When we create the components of the join that combines metrics it will be one of INNER, FULL OUTER,
# or CROSS JOIN. Order doesn't matter for these join types, so we will use the first element in the FROM
Expand Down Expand Up @@ -986,8 +1008,9 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:

metric_aggregation_type = AggregationType.MAX
metric_select_column_set = SelectColumnSet(
metric_columns=self._make_select_columns_for_metrics(
table_alias_to_metric_specs, aggregation_type=metric_aggregation_type
metric_columns=self._make_select_columns_for_multiple_metrics(
table_alias_to_metric_instances=table_alias_to_metric_instances,
aggregation_type=metric_aggregation_type,
)
)
linkable_select_column_set = linkable_spec_set.transform(
Expand Down
105 changes: 104 additions & 1 deletion metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ integration_test:
FROM (
SELECT
subq_7.metric_time__day
, subq_7.bookings_fill_nulls_with_0 AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, subq_15.bookings_2_weeks_ago AS bookings_2_weeks_ago
FROM (
SELECT
Expand Down Expand Up @@ -1312,4 +1312,107 @@ integration_test:
GROUP BY subq_11.ds
) subq_15
ON subq_7.metric_time__day = subq_15.metric_time__day
GROUP BY 1, 3
) subq_16
---
integration_test:
name: fill_nulls_with_0_multi_metric_query
description: Test a multi-metric query that fills nulls
model: SIMPLE_MODEL
metrics: ["bookings_fill_nulls_with_0", "views"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
COALESCE(subq_7.metric_time__day, subq_12.metric_time__day) AS metric_time__day
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there missing values in this date sequence for the bookings_fill_nulls_with metric? If so that's great. An output test would be nice but at least we can inspect the data frame from here and see it.

If not, it might be worth adding a dimension that has a gap for a given metric_time, dimension_value pair.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new test case & updated the source data to make sure there were some nulls filled for this case.
Will plan to add some output tests tomorrow!

, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_12.views) AS views
FROM (
SELECT
subq_5.ds AS metric_time__day
, COALESCE(subq_3.bookings, 0) AS bookings_fill_nulls_with_0
FROM {{ source_schema }}.mf_time_spine subq_5
LEFT OUTER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY metric_time__day
) subq_3
ON subq_5.ds = subq_3.metric_time__day
) subq_7
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(1) AS views
FROM {{ source_schema }}.fct_views views_source_src_9
GROUP BY metric_time__day
) subq_12
ON subq_7.metric_time__day = subq_12.metric_time__day
GROUP BY COALESCE(subq_7.metric_time__day, subq_12.metric_time__day)
---
integration_test:
name: fill_nulls_with_0_multi_metric_query_with_nesting
description: Test a multi-metric query with a nested join that fills nulls
model: SIMPLE_MODEL
metrics: ["bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset", "booking_value"]
group_by_objs: [{"name": "metric_time"}]
check_query: |
SELECT
COALESCE(subq_17.metric_time__day, subq_22.metric_time__day) AS metric_time__day
, MAX(subq_17.bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset) AS bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset
, MAX(subq_22.booking_value) AS booking_value
FROM (
SELECT
metric_time__day
, bookings_fill_nulls_with_0 - bookings_2_weeks_ago AS bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset
FROM (
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
SELECT
metric_time__day
, COALESCE(bookings, 0) AS bookings_fill_nulls_with_0
FROM (
SELECT
subq_5.ds AS metric_time__day
, subq_3.bookings AS bookings
FROM {{ source_schema }}.mf_time_spine subq_5
LEFT OUTER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY metric_time__day
) subq_3
ON subq_5.ds = subq_3.metric_time__day
) subq_6
) subq_7
FULL OUTER JOIN (
SELECT
subq_11.ds AS metric_time__day
, SUM(subq_9.bookings) AS bookings_2_weeks_ago
FROM {{ source_schema }}.mf_time_spine subq_11
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
) subq_9
ON {{ render_date_sub("subq_11", "ds", 14, TimeGranularity.DAY) }} = subq_9.metric_time__day
GROUP BY subq_11.ds
) subq_15
ON subq_7.metric_time__day = subq_15.metric_time__day
GROUP BY COALESCE(subq_7.metric_time__day, subq_15.metric_time__day)
) subq_16
) subq_17
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, SUM(booking_value) AS booking_value
FROM {{ source_schema }}.fct_bookings bookings_source_src_1
GROUP BY {{ render_date_trunc("ds", TimeGranularity.DAY) }}
) subq_22
ON subq_17.metric_time__day = subq_22.metric_time__day
GROUP BY COALESCE(subq_17.metric_time__day, subq_22.metric_time__day)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, MAX(subq_7.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_24.metric_time__day, subq_32.metric_time__day) AS metric_time__day
, MAX(subq_24.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_24.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_32.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, MAX(subq_7.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_24.metric_time__day, subq_32.metric_time__day) AS metric_time__day
, MAX(subq_24.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_24.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_32.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, MAX(subq_7.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_24.metric_time__day, subq_32.metric_time__day) AS metric_time__day
, MAX(subq_24.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_24.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_32.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, MAX(subq_7.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_24.metric_time__day, subq_32.metric_time__day) AS metric_time__day
, MAX(subq_24.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_24.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_32.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, MAX(subq_7.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_24.metric_time__day, subq_32.metric_time__day) AS metric_time__day
, MAX(subq_24.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_24.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_32.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day
, MAX(subq_7.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM (
-- Combine Metrics
SELECT
COALESCE(subq_24.metric_time__day, subq_32.metric_time__day) AS metric_time__day
, MAX(subq_24.bookings_fill_nulls_with_0) AS bookings_fill_nulls_with_0
, COALESCE(MAX(subq_24.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0
, MAX(subq_32.bookings_2_weeks_ago) AS bookings_2_weeks_ago
FROM (
-- Compute Metrics via Expressions
Expand Down
Loading