-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fill nulls for multi-metric queries #850
Changes from 4 commits
4de5d57
5cd869e
e8e1b17
304b730
6abdf1c
affea74
b8bc8dc
aa32059
6545bc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
kind: Features | ||
body: Fill nulls for multi-metric queries | ||
time: 2023-11-06T15:00:14.37926-08:00 | ||
custom: | ||
Author: courtneyholcomb | ||
Issue: "850" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,11 @@ | |
|
||
import logging | ||
from collections import OrderedDict | ||
from typing import List, Optional, Sequence, Union | ||
from typing import List, Optional, Sequence, Tuple, Union | ||
|
||
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted | ||
from dbt_semantic_interfaces.protocols.metric import MetricInputMeasure, MetricType | ||
from dbt_semantic_interfaces.references import MetricModelReference | ||
from dbt_semantic_interfaces.references import MetricModelReference, MetricReference | ||
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType | ||
|
||
from metricflow.aggregation_properties import AggregationState | ||
|
@@ -706,7 +706,7 @@ def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> SqlDataSet: | |
metric_instances.append( | ||
MetricInstance( | ||
associated_columns=(output_column_association,), | ||
defined_from=(MetricModelReference(metric_name=metric_spec.element_name),), | ||
defined_from=MetricModelReference(metric_name=metric_spec.element_name), | ||
spec=metric_spec.alias_spec, | ||
) | ||
) | ||
|
@@ -862,24 +862,25 @@ def visit_where_constraint_node(self, node: WhereConstraintNode) -> SqlDataSet: | |
), | ||
) | ||
|
||
def _make_select_columns_for_metrics( | ||
def _make_select_columns_for_multiple_metrics( | ||
self, | ||
table_alias_to_metric_specs: OrderedDict[str, Sequence[MetricSpec]], | ||
table_alias_to_metric_instances: OrderedDict[str, Tuple[MetricInstance, ...]], | ||
aggregation_type: Optional[AggregationType], | ||
) -> List[SqlSelectColumn]: | ||
"""Creates select columns that get the given metric using the given table alias. | ||
|
||
e.g. | ||
|
||
with table_alias_to_metric_specs = {"a": MetricSpec(element_name="bookings")} | ||
with table_alias_to_metric_instances = {"a": MetricSpec(element_name="bookings")} | ||
|
||
-> | ||
|
||
a.bookings AS bookings | ||
""" | ||
select_columns = [] | ||
for table_alias, metric_specs in table_alias_to_metric_specs.items(): | ||
for metric_spec in metric_specs: | ||
for table_alias, metric_instances in table_alias_to_metric_instances.items(): | ||
for metric_instance in metric_instances: | ||
metric_spec = metric_instance.spec | ||
metric_column_name = self._column_association_resolver.resolve_spec(metric_spec).column_name | ||
column_reference_expression = SqlColumnReferenceExpression( | ||
col_ref=SqlColumnReference( | ||
|
@@ -894,6 +895,27 @@ def _make_select_columns_for_metrics( | |
else: | ||
select_expression = column_reference_expression | ||
|
||
# At this point, the MetricSpec might have the alias in place of the element name, so we need to look | ||
# back at where it was defined from to get the metric element name. | ||
metric_name = metric_instance.defined_from.metric_name | ||
input_measures = self._metric_lookup.measures_for_metric( | ||
metric_reference=MetricReference(metric_name), | ||
column_association_resolver=self._column_association_resolver, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another way to deal with the issue below is to add a different, internal API here that only gives the measures that are direct inputs into the metric. That way, if we add another metric type that takes measures as input anything derived from it won't have anything to do here, and we ensure we only do the coalesce once within a derived metric chain. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that idea! I'll add that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added that! |
||
# If multiple input measures, this is a query with a nested derived/ratio metric. In that case, we can | ||
# skip this step because it has already occurred when rendering the nested metric. | ||
# TODO: this logic might need updating for conversion metrics, which will allow multiple input measures | ||
courtneyholcomb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if len(input_measures) == 1: | ||
input_measure = input_measures[0] | ||
if input_measure.fill_nulls_with is not None: | ||
select_expression = SqlAggregateFunctionExpression( | ||
sql_function=SqlFunction.COALESCE, | ||
sql_function_args=[ | ||
select_expression, | ||
SqlStringExpression(str(input_measure.fill_nulls_with)), | ||
], | ||
) | ||
|
||
select_columns.append( | ||
SqlSelectColumn( | ||
expr=select_expression, | ||
|
@@ -938,13 +960,13 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet: | |
), "Shouldn't have a CombineMetricsNode in the dataflow plan if there's only 1 parent." | ||
|
||
parent_data_sets: List[AnnotatedSqlDataSet] = [] | ||
table_alias_to_metric_specs: OrderedDict[str, Sequence[MetricSpec]] = OrderedDict() | ||
table_alias_to_metric_instances: OrderedDict[str, Tuple[MetricInstance, ...]] = OrderedDict() | ||
|
||
for parent_node in node.parent_nodes: | ||
parent_sql_data_set = parent_node.accept(self) | ||
table_alias = self._next_unique_table_alias() | ||
parent_data_sets.append(AnnotatedSqlDataSet(data_set=parent_sql_data_set, alias=table_alias)) | ||
table_alias_to_metric_specs[table_alias] = parent_sql_data_set.instance_set.spec_set.metric_specs | ||
table_alias_to_metric_instances[table_alias] = parent_sql_data_set.instance_set.metric_instances | ||
|
||
# When we create the components of the join that combines metrics it will be one of INNER, FULL OUTER, | ||
# or CROSS JOIN. Order doesn't matter for these join types, so we will use the first element in the FROM | ||
|
@@ -986,8 +1008,9 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet: | |
|
||
metric_aggregation_type = AggregationType.MAX | ||
metric_select_column_set = SelectColumnSet( | ||
metric_columns=self._make_select_columns_for_metrics( | ||
table_alias_to_metric_specs, aggregation_type=metric_aggregation_type | ||
metric_columns=self._make_select_columns_for_multiple_metrics( | ||
table_alias_to_metric_instances=table_alias_to_metric_instances, | ||
aggregation_type=metric_aggregation_type, | ||
) | ||
) | ||
linkable_select_column_set = linkable_spec_set.transform( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,7 +436,7 @@ def from_reference(reference: MetricReference) -> MetricSpec: | |
|
||
@property | ||
def alias_spec(self) -> MetricSpec: | ||
"""Returns a MetricSpec represneting the alias state.""" | ||
"""Returns a MetricSpec representing the alias state.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😸 |
||
return MetricSpec( | ||
element_name=self.alias or self.element_name, | ||
constraint=self.constraint, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1280,7 +1280,7 @@ integration_test: | |
FROM ( | ||
SELECT | ||
subq_7.metric_time__day | ||
, subq_7.bookings_fill_nulls_with_0 AS bookings_fill_nulls_with_0 | ||
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0 | ||
, subq_15.bookings_2_weeks_ago AS bookings_2_weeks_ago | ||
FROM ( | ||
SELECT | ||
|
@@ -1312,4 +1312,107 @@ integration_test: | |
GROUP BY subq_11.ds | ||
) subq_15 | ||
ON subq_7.metric_time__day = subq_15.metric_time__day | ||
GROUP BY 1, 3 | ||
) subq_16 | ||
--- | ||
integration_test: | ||
name: fill_nulls_with_0_multi_metric_query | ||
description: Test a multi-metric query that fills nulls | ||
model: SIMPLE_MODEL | ||
metrics: ["bookings_fill_nulls_with_0", "views"] | ||
group_by_objs: [{"name": "metric_time"}] | ||
check_query: | | ||
SELECT | ||
COALESCE(subq_7.metric_time__day, subq_12.metric_time__day) AS metric_time__day | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there missing values in this date sequence for the bookings_fill_nulls_with metric? If so that's great. An output test would be nice but at least we can inspect the data frame from here and see it. If not, it might be worth adding a dimension that has a gap for a given metric_time, dimension_value pair. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a new test case & updated the source data to make sure there were some nulls filled for this case. |
||
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0 | ||
, MAX(subq_12.views) AS views | ||
FROM ( | ||
SELECT | ||
subq_5.ds AS metric_time__day | ||
, COALESCE(subq_3.bookings, 0) AS bookings_fill_nulls_with_0 | ||
FROM {{ source_schema }}.mf_time_spine subq_5 | ||
LEFT OUTER JOIN ( | ||
SELECT | ||
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day | ||
, SUM(1) AS bookings | ||
FROM {{ source_schema }}.fct_bookings bookings_source_src_1 | ||
GROUP BY metric_time__day | ||
) subq_3 | ||
ON subq_5.ds = subq_3.metric_time__day | ||
) subq_7 | ||
FULL OUTER JOIN ( | ||
SELECT | ||
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day | ||
, SUM(1) AS views | ||
FROM {{ source_schema }}.fct_views views_source_src_9 | ||
GROUP BY metric_time__day | ||
) subq_12 | ||
ON subq_7.metric_time__day = subq_12.metric_time__day | ||
GROUP BY COALESCE(subq_7.metric_time__day, subq_12.metric_time__day) | ||
--- | ||
integration_test: | ||
name: fill_nulls_with_0_multi_metric_query_with_nesting | ||
description: Test a multi-metric query with a nested join that fills nulls | ||
model: SIMPLE_MODEL | ||
metrics: ["bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset", "booking_value"] | ||
group_by_objs: [{"name": "metric_time"}] | ||
check_query: | | ||
SELECT | ||
COALESCE(subq_17.metric_time__day, subq_22.metric_time__day) AS metric_time__day | ||
, MAX(subq_17.bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset) AS bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset | ||
, MAX(subq_22.booking_value) AS booking_value | ||
FROM ( | ||
SELECT | ||
metric_time__day | ||
, bookings_fill_nulls_with_0 - bookings_2_weeks_ago AS bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset | ||
FROM ( | ||
SELECT | ||
COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) AS metric_time__day | ||
, COALESCE(MAX(subq_7.bookings_fill_nulls_with_0), 0) AS bookings_fill_nulls_with_0 | ||
, MAX(subq_15.bookings_2_weeks_ago) AS bookings_2_weeks_ago | ||
FROM ( | ||
SELECT | ||
metric_time__day | ||
, COALESCE(bookings, 0) AS bookings_fill_nulls_with_0 | ||
FROM ( | ||
SELECT | ||
subq_5.ds AS metric_time__day | ||
, subq_3.bookings AS bookings | ||
FROM {{ source_schema }}.mf_time_spine subq_5 | ||
LEFT OUTER JOIN ( | ||
SELECT | ||
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day | ||
, SUM(1) AS bookings | ||
FROM {{ source_schema }}.fct_bookings bookings_source_src_1 | ||
GROUP BY metric_time__day | ||
) subq_3 | ||
ON subq_5.ds = subq_3.metric_time__day | ||
) subq_6 | ||
) subq_7 | ||
FULL OUTER JOIN ( | ||
SELECT | ||
subq_11.ds AS metric_time__day | ||
, SUM(subq_9.bookings) AS bookings_2_weeks_ago | ||
FROM {{ source_schema }}.mf_time_spine subq_11 | ||
INNER JOIN ( | ||
SELECT | ||
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day | ||
, 1 AS bookings | ||
FROM {{ source_schema }}.fct_bookings bookings_source_src_1 | ||
) subq_9 | ||
ON {{ render_date_sub("subq_11", "ds", 14, TimeGranularity.DAY) }} = subq_9.metric_time__day | ||
GROUP BY subq_11.ds | ||
) subq_15 | ||
ON subq_7.metric_time__day = subq_15.metric_time__day | ||
GROUP BY COALESCE(subq_7.metric_time__day, subq_15.metric_time__day) | ||
) subq_16 | ||
) subq_17 | ||
FULL OUTER JOIN ( | ||
SELECT | ||
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day | ||
, SUM(booking_value) AS booking_value | ||
FROM {{ source_schema }}.fct_bookings bookings_source_src_1 | ||
GROUP BY {{ render_date_trunc("ds", TimeGranularity.DAY) }} | ||
) subq_22 | ||
ON subq_17.metric_time__day = subq_22.metric_time__day | ||
GROUP BY COALESCE(subq_17.metric_time__day, subq_22.metric_time__day) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL, thanks. We have so many of these kind of "let's allow a sequence of things and then have exactly one callsite that manually enforces there's only one" interfaces lying around...