Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use correct join node specs for GroupByMetrics #1193

Merged
merged 9 commits into from
May 10, 2024
27 changes: 16 additions & 11 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,16 @@ def __post_init__(self) -> None: # noqa: D105
if self.join_on_entity is None and self.join_type != SqlJoinType.CROSS_JOIN:
raise RuntimeError("`join_on_entity` is required unless using CROSS JOIN.")

# TODO: JoinDescription is very similar to JoinLinkableInstancesRecipe. Can we consolidate by just adding a
# `filtered_node_to_join` property on JoinLinkableInstancesRecipe?
@property
def join_description(self) -> JoinDescription:
"""The recipe as a join description to use in the dataflow plan node."""
# Figure out what elements to filter from the joined node.
"""The recipe as a join description to use in the dataflow plan node.

Here, we figure out which instance specs to keep from this node in order to join to it and render its
satisfiable linkable specs, e.g. if the node is used to satisfy "user_id__country", the node must have the
entity "user_id" and the "country" dimension so that it can be joined to the source node.
"""
include_specs: List[LinkableInstanceSpec] = []
assert all(
[
Expand All @@ -92,13 +98,13 @@ def join_description(self) -> JoinDescription:
]
)

include_specs.extend(
[
LinklessEntitySpec.from_reference(spec.entity_links[0])
for spec in self.satisfiable_linkable_specs
if len(spec.entity_links) > 0
]
)
# Get the specs needed to join onto this node.
if self.node_to_join.aggregated_to_elements:
include_specs.extend(self.node_to_join.aggregated_to_elements)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want these as-is or did we want to convert them to the LinklessEntitySpec types at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want them as-is. The reason they were always LinklessEntitySpecs previously is that we were always joining to source nodes (which would not have joins contained WITHIN them). In this case, we are joining to a node that has joins within it, so its instance specs might have entity links.

else:
for spec in self.satisfiable_linkable_specs:
if len(spec.entity_links) > 0:
include_specs.append(LinklessEntitySpec.from_reference(spec.entity_links[0]))

include_specs.extend([join.node_to_join_dimension_spec for join in self.join_on_partition_dimensions])
include_specs.extend([join.node_to_join_time_dimension_spec for join in self.join_on_partition_time_dimensions])
Expand All @@ -109,14 +115,13 @@ def join_description(self) -> JoinDescription:

# `satisfiable_linkable_specs` describes what can be satisfied after the join, so remove the entity
# link when filtering before the join.
# e.g. if the node is used to satisfy "user_id__country", then the node must have the entity
# "user_id" and the "country" dimension so that it can be joined to the source node.
include_specs.extend(
[
spec.without_first_entity_link if len(spec.entity_links) > 0 else spec
for spec in self.satisfiable_linkable_specs
]
)

filtered_node_to_join = FilterElementsNode(
parent_node=self.node_to_join, include_specs=group_specs_by_type(include_specs)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
<FilterElementsNode>
<!-- description = "Pass Only Elements: ['listing', 'listing__bookings']" -->
<!-- node_id = NodeId(id_str='pfe_2') -->
<!-- include_spec = LinklessEntitySpec(element_name='listing') -->
<!-- include_spec = EntitySpec(element_name='listing') -->
<!-- include_spec = -->
<!-- GroupByMetricSpec( -->
<!-- element_name='bookings', -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
<FilterElementsNode>
<!-- description = "Pass Only Elements: ['listing', 'listing__bookings']" -->
<!-- node_id = NodeId(id_str='pfe_2') -->
<!-- include_spec = LinklessEntitySpec(element_name='listing') -->
<!-- include_spec = EntitySpec(element_name='listing') -->
<!-- include_spec = -->
<!-- GroupByMetricSpec( -->
<!-- element_name='bookings', -->
Expand Down