Skip to content

Commit

Permalink
Switch to FULL OUTER JOIN for derived & ratio metrics (#842)
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb authored Nov 4, 2023
1 parent 151975b commit 38adb04
Show file tree
Hide file tree
Showing 197 changed files with 1,761 additions and 2,967 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Breaking Changes-20231102-182815.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Use FULL OUTER JOIN to combine input metrics for derived metrics. This is a change from using INNER JOIN and may result in changes in output.
time: 2023-11-02T18:28:15.181064-07:00
custom:
Author: courtneyholcomb
Issue: "842"
9 changes: 1 addition & 8 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def _build_metrics_output_node(
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
combine_metrics_join_type: SqlJoinType = SqlJoinType.FULL_OUTER,
) -> BaseOutput:
"""Builds a computed metrics output node.
Expand All @@ -222,7 +221,6 @@ def _build_metrics_output_node(
queried_linkable_specs: Dimensions/entities that were queried for.
where_constraint: Where constraint used to compute the metric.
time_range_constraint: Time range constraint used to compute the metric.
combine_metrics_join_type: The join used when combining the computed metrics.
"""
output_nodes: List[BaseOutput] = []
compute_metrics_node: Optional[ComputeMetricsNode] = None
Expand All @@ -241,14 +239,12 @@ def _build_metrics_output_node(
f"For {metric.type} metric: {metric_spec}, needed metrics are:\n"
f"{pformat_big_objects(metric_input_specs=metric_input_specs)}"
)

compute_metrics_node = ComputeMetricsNode(
parent_node=self._build_metrics_output_node(
metric_specs=metric_input_specs,
queried_linkable_specs=queried_linkable_specs,
where_constraint=where_constraint,
time_range_constraint=time_range_constraint,
combine_metrics_join_type=SqlJoinType.INNER,
),
metric_specs=[metric_spec],
)
Expand Down Expand Up @@ -295,10 +291,7 @@ def _build_metrics_output_node(
if len(output_nodes) == 1:
return output_nodes[0]

return CombineMetricsNode(
parent_nodes=output_nodes,
join_type=combine_metrics_join_type,
)
return CombineMetricsNode(parent_nodes=output_nodes)

def build_plan_for_distinct_values(self, query_spec: MetricFlowQuerySpec) -> DataflowPlan:
"""Generate a plan that would get the distinct values of a linkable instance.
Expand Down
25 changes: 2 additions & 23 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,7 @@ class CombineMetricsNode(ComputedMetricsOutput):
def __init__( # noqa: D
self,
parent_nodes: Sequence[Union[BaseOutput, ComputedMetricsOutput]],
join_type: SqlJoinType = SqlJoinType.FULL_OUTER,
) -> None:
self._join_type = join_type
super().__init__(node_id=self.create_unique_id(), parent_nodes=list(parent_nodes))

@classmethod
Expand All @@ -1241,31 +1239,12 @@ def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOut
def description(self) -> str: # noqa: D
return "Combine Metrics"

@property
def displayed_properties(self) -> List[DisplayedProperty]:
"""Prints details about the join types and how the node will behave."""
custom_properties = [DisplayedProperty("join type", self.join_type)]
if self.join_type is SqlJoinType.FULL_OUTER:
custom_properties.append(
DisplayedProperty("de-duplication method", "post-join aggregation across all dimensions")
)

return super().displayed_properties + custom_properties

@property
def join_type(self) -> SqlJoinType:
"""The type of join used for combining metrics."""
return self._join_type

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D
return isinstance(other_node, self.__class__) and other_node.join_type == self.join_type
return isinstance(other_node, self.__class__)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> CombineMetricsNode: # noqa: D
assert len(new_parent_nodes) == 1
return CombineMetricsNode(
parent_nodes=new_parent_nodes,
join_type=self.join_type,
)
return CombineMetricsNode(parent_nodes=new_parent_nodes)


class ConstrainTimeRangeNode(AggregatedMeasuresOutput, BaseOutput):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> OptimizeBranch
if len(combined_parent_branches) == 1:
return OptimizeBranchResult(base_output_node=combined_parent_branches[0])

return OptimizeBranchResult(
base_output_node=CombineMetricsNode(parent_nodes=combined_parent_branches, join_type=node.join_type)
)
return OptimizeBranchResult(base_output_node=CombineMetricsNode(parent_nodes=combined_parent_branches))

def visit_constrain_time_range_node(self, node: ConstrainTimeRangeNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
Expand Down
14 changes: 6 additions & 8 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,10 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
"""Join computed metric datasets together to return a single dataset containing all metrics.
This node may exist in one of two situations: when metrics need to be combined in order to produce a single
dataset with all required inputs for a derived metric (in which case the join type is INNER), or when
metrics need to be combined in order to produce a single dataset of output for downstream consumption by
the end user, in which case we will use FULL OUTER JOIN.
dataset with all required inputs for a derived metric, or when metrics need to be combined in order to produce
a single dataset of output for downstream consumption by the end user.
In the case of a multi-data-source FULL OUTER JOIN the join key will be a coalesced set of all previously
seen dimension values. For example:
The join key will be a coalesced set of all previously seen dimension values. For example:
FROM (
...
) subq_9
Expand Down Expand Up @@ -961,7 +959,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
), "All parent nodes should have the same set of linkable instances since all values are coalesced."

linkable_spec_set = from_data_set.data_set.instance_set.spec_set.transform(SelectOnlyLinkableSpecs())
join_type = SqlJoinType.CROSS_JOIN if len(linkable_spec_set.all_specs) == 0 else node.join_type
join_type = SqlJoinType.CROSS_JOIN if len(linkable_spec_set.all_specs) == 0 else SqlJoinType.FULL_OUTER

joins_descriptions: List[SqlJoinDescription] = []
# TODO: refactor this loop into SqlQueryPlanJoinBuilder
Expand All @@ -986,7 +984,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
output_instance_set = InstanceSet.merge([x.data_set.instance_set for x in parent_data_sets])
output_instance_set = output_instance_set.transform(ChangeAssociatedColumns(self._column_association_resolver))

metric_aggregation_type = AggregationType.MAX if node.join_type is SqlJoinType.FULL_OUTER else None
metric_aggregation_type = AggregationType.MAX
metric_select_column_set = SelectColumnSet(
metric_columns=self._make_select_columns_for_metrics(
table_alias_to_metric_specs, aggregation_type=metric_aggregation_type
Expand All @@ -1008,7 +1006,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
from_source=from_data_set.data_set.sql_select_node,
from_source_alias=from_data_set.alias,
joins_descs=tuple(joins_descriptions),
group_bys=linkable_select_column_set.as_tuple() if node.join_type is SqlJoinType.FULL_OUTER else (),
group_bys=linkable_select_column_set.as_tuple(),
where=None,
order_bys=(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ integration_test:
WHERE is_instant
GROUP BY ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(MAX(booking_value), 0) AS {{ double_data_type_name }} ) AS max_booking_value
, ds
Expand Down Expand Up @@ -48,7 +48,7 @@ integration_test:
WHERE listings_latest.is_lux
GROUP BY fct_bookings.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(MAX(booking_value), 0) AS {{ double_data_type_name }} ) AS max_booking_value
, ds
Expand Down Expand Up @@ -79,7 +79,7 @@ integration_test:
WHERE listings_latest.is_lux
GROUP BY fct_bookings.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(SUM(booking_value), 0) AS {{ double_data_type_name }} ) AS booking_value
, ds
Expand Down Expand Up @@ -107,7 +107,7 @@ integration_test:
WHERE is_instant
GROUP BY ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(SUM(booking_value), 0) AS {{ double_data_type_name }} ) AS booking_value
, ds
Expand Down Expand Up @@ -153,7 +153,7 @@ integration_test:
WHERE dul_west.home_state_latest IN ('CA', 'HI', 'WA')
GROUP BY fa_west_filtered.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(SUM(account_balance) AS {{ double_data_type_name }}) AS total_account_balance_first_day
, fa_east_filtered.ds
Expand Down
75 changes: 41 additions & 34 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ integration_test:
FROM {{source_schema}}.fct_bookings
GROUP BY ds
) b
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS views
, ds
Expand Down Expand Up @@ -284,7 +284,7 @@ integration_test:
GROUP BY
ds
) groupby_8cbdaa28
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS views
, ds
Expand Down Expand Up @@ -350,7 +350,7 @@ integration_test:
GROUP BY
ds
) groupby_8cbdaa28
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS listings
, created_at AS ds
Expand Down Expand Up @@ -519,7 +519,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS lux_listings
, created_at AS metric_time__day
Expand Down Expand Up @@ -643,27 +643,34 @@ integration_test:
group_bys: [listing__is_lux_latest]
check_query: |
SELECT
bk.booking_value / NULLIF(vw.views, 0) AS booking_value_per_view
, bk.is_lux AS listing__is_lux_latest
booking_value / NULLIF(views, 0) AS booking_value_per_view
, listing__is_lux_latest
FROM (
SELECT
SUM(a.booking_value) AS booking_value
,b.is_lux
FROM {{ source_schema }}.fct_bookings a
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest b
ON a.listing_id = b.listing_id
GROUP BY 2
) bk
INNER JOIN (
SELECT
SUM(1) AS views
,d.is_lux
FROM {{ source_schema }}.fct_views c
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest d
ON c.listing_id = d.listing_id
GROUP BY 2
) vw
ON bk.is_lux = vw.is_lux OR (bk.is_lux IS NULL AND vw.is_lux IS NULL)
MAX(bk.booking_value) AS booking_value
, MAX(vw.views) AS views
, COALESCE(bk.is_lux, vw.is_lux) AS listing__is_lux_latest
FROM (
SELECT
SUM(a.booking_value) AS booking_value
,b.is_lux
FROM {{ source_schema }}.fct_bookings a
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest b
ON a.listing_id = b.listing_id
GROUP BY 2
) bk
FULL OUTER JOIN (
SELECT
SUM(1) AS views
,d.is_lux
FROM {{ source_schema }}.fct_views c
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest d
ON c.listing_id = d.listing_id
GROUP BY 2
) vw
ON bk.is_lux = vw.is_lux
GROUP BY 3
) x
---
integration_test:
name: derived_metric_with_offset_window
Expand All @@ -683,7 +690,7 @@ integration_test:
GROUP BY
metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings_2_weeks_ago AS bookings_2_weeks_ago
Expand Down Expand Up @@ -718,7 +725,7 @@ integration_test:
GROUP BY
metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings_at_start_of_month AS bookings_at_start_of_month
Expand Down Expand Up @@ -760,7 +767,7 @@ integration_test:
) f
ON {{ render_date_sub("g", "ds", 1, TimeGranularity.MONTH) }} = f.metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings AS month_start_bookings
Expand Down Expand Up @@ -808,7 +815,7 @@ integration_test:
check_query: |
SELECT
booking_value - instant_booking_value AS booking_value_sub_instant
, a.metric_time__day
, COALESCE(a.metric_time__day, b.metric_time__day) AS metric_time__day
FROM (
SELECT
SUM(booking_value) AS instant_booking_value
Expand All @@ -818,7 +825,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(booking_value) AS booking_value
, ds AS metric_time__day
Expand All @@ -843,7 +850,7 @@ integration_test:
FROM (
SELECT
booking_value - instant_booking_value AS booking_value_sub_instant
, a.metric_time__day AS metric_time__day
, COALESCE(a.metric_time__day, b.metric_time__day) AS metric_time__day
FROM (
SELECT
SUM(booking_value) AS instant_booking_value
Expand All @@ -853,7 +860,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(booking_value) AS booking_value
, ds AS metric_time__day
Expand Down Expand Up @@ -901,7 +908,7 @@ integration_test:
FROM {{ source_schema }}.fct_bookings
GROUP BY metric_time__week
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.WEEK) }} AS metric_time__week
, SUM(d.bookings_at_start_of_month) AS bookings_at_start_of_month
Expand Down Expand Up @@ -944,7 +951,7 @@ integration_test:
ON {{ render_date_sub("g", "ds", 1, TimeGranularity.MONTH) }} = f.metric_time__day
GROUP BY metric_time__year
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.YEAR) }} AS metric_time__year
, SUM(d.bookings) AS month_start_bookings
Expand Down Expand Up @@ -1024,7 +1031,7 @@ integration_test:
FROM {{ source_schema }}.fct_bookings
GROUP BY metric_time__week, metric_time__month
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.WEEK) }} AS metric_time__week
, {{ render_date_trunc("c.ds", TimeGranularity.MONTH) }} AS metric_time__month
Expand Down Expand Up @@ -1290,7 +1297,7 @@ integration_test:
) subq_3
ON subq_5.ds = subq_3.metric_time__day
) subq_7
INNER JOIN (
FULL OUTER JOIN (
SELECT
subq_11.ds AS metric_time__day
, SUM(subq_9.bookings) AS bookings_2_weeks_ago
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Loading

0 comments on commit 38adb04

Please sign in to comment.