-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug fix: use FULL OUTER JOIN
for dimension-only queries
#863
Changes from 1 commit
b8b62aa
44fc845
153e36d
2b03b83
e140dd0
3466709
c65e759
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
from metricflow.dataflow.builder.partitions import PartitionJoinResolver | ||
from metricflow.dataflow.dataflow_plan import ( | ||
BaseOutput, | ||
FilterElementsNode, | ||
JoinDescription, | ||
PartitionDimensionJoinDescription, | ||
PartitionTimeDimensionJoinDescription, | ||
|
@@ -37,10 +38,8 @@ | |
from metricflow.model.semantics.semantic_model_join_evaluator import SemanticModelJoinEvaluator | ||
from metricflow.plan_conversion.instance_converters import CreateValidityWindowJoinDescription | ||
from metricflow.protocols.semantics import SemanticModelAccessor | ||
from metricflow.specs.specs import ( | ||
LinkableInstanceSpec, | ||
LinklessEntitySpec, | ||
) | ||
from metricflow.specs.specs import InstanceSpecSet, LinkableInstanceSpec, LinklessEntitySpec | ||
from metricflow.sql.sql_plan import SqlJoinType | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -55,12 +54,14 @@ class JoinLinkableInstancesRecipe: | |
""" | ||
|
||
node_to_join: BaseOutput | ||
# The entity to join "node_to_join" on. | ||
join_on_entity: LinklessEntitySpec | ||
# The entity to join "node_to_join" on. Not needed for cross-joins. | ||
join_on_entity: Optional[LinklessEntitySpec] | ||
# The linkable instances from the query that can be satisfied if we join this node. Note that this is different from | ||
# the linkable specs in the node that can help to satisfy the query. e.g. "user_id__country" might be one of the | ||
# "satisfiable_linkable_specs", but "country" is the linkable spec in the node. | ||
satisfiable_linkable_specs: List[LinkableInstanceSpec] | ||
# Join type to use when joining node | ||
join_type: SqlJoinType | ||
|
||
# The partitions to join on, if there are matching partitions between the start_node and node_to_join. | ||
join_on_partition_dimensions: Tuple[PartitionDimensionJoinDescription, ...] | ||
|
@@ -71,12 +72,37 @@ class JoinLinkableInstancesRecipe: | |
@property | ||
def join_description(self) -> JoinDescription: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prior to this PR, this attribute was not used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lolwut |
||
"""The recipe as a join description to use in the dataflow plan node.""" | ||
# Figure out what elements to filter from the joined node. | ||
include_specs: List[LinkableInstanceSpec] = [] | ||
if not self.join_type == SqlJoinType.CROSS_JOIN: | ||
assert all([len(spec.entity_links) > 0 for spec in self.satisfiable_linkable_specs]) | ||
include_specs.extend( | ||
[LinklessEntitySpec.from_reference(spec.entity_links[0]) for spec in self.satisfiable_linkable_specs] | ||
) | ||
|
||
include_specs.extend([join.node_to_join_dimension_spec for join in self.join_on_partition_dimensions]) | ||
include_specs.extend([join.node_to_join_time_dimension_spec for join in self.join_on_partition_time_dimensions]) | ||
if self.validity_window: | ||
include_specs.extend( | ||
[self.validity_window.window_start_dimension, self.validity_window.window_end_dimension] | ||
) | ||
|
||
# `satisfiable_linkable_specs` describes what can be satisfied after the join, so remove the entity | ||
# link when filtering before the join. | ||
# e.g. if the node is used to satisfy "user_id__country", then the node must have the entity | ||
# "user_id" and the "country" dimension so that it can be joined to the source node. | ||
include_specs.extend([spec.without_first_entity_link for spec in self.satisfiable_linkable_specs]) | ||
filtered_node_to_join = FilterElementsNode( | ||
parent_node=self.node_to_join, include_specs=InstanceSpecSet.create_from_linkable_specs(include_specs) | ||
) | ||
|
||
return JoinDescription( | ||
join_node=self.node_to_join, | ||
join_node=filtered_node_to_join, | ||
join_on_entity=self.join_on_entity, | ||
join_on_partition_dimensions=self.join_on_partition_dimensions, | ||
join_on_partition_time_dimensions=self.join_on_partition_time_dimensions, | ||
validity_window=self.validity_window, | ||
join_type=self.join_type, | ||
) | ||
|
||
|
||
|
@@ -133,6 +159,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( | |
self, | ||
start_node_instance_set: InstanceSet, | ||
needed_linkable_specs: List[LinkableInstanceSpec], | ||
join_type: SqlJoinType, | ||
) -> List[JoinLinkableInstancesRecipe]: | ||
"""Get nodes that can be joined to get 1 or more of the "needed_linkable_specs". | ||
|
||
|
@@ -257,6 +284,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( | |
join_on_partition_dimensions=join_on_partition_dimensions, | ||
join_on_partition_time_dimensions=join_on_partition_time_dimensions, | ||
validity_window=validity_window_join_description, | ||
join_type=join_type, | ||
) | ||
) | ||
|
||
|
@@ -271,6 +299,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( | |
def _update_candidates_that_can_satisfy_linkable_specs( | ||
candidates_for_join: List[JoinLinkableInstancesRecipe], | ||
already_satisfisfied_linkable_specs: List[LinkableInstanceSpec], | ||
join_type: SqlJoinType, | ||
) -> List[JoinLinkableInstancesRecipe]: | ||
"""Update / filter candidates_for_join based on linkable instance specs that we have already satisfied. | ||
|
||
|
@@ -294,6 +323,7 @@ def _update_candidates_that_can_satisfy_linkable_specs( | |
join_on_partition_dimensions=candidate_for_join.join_on_partition_dimensions, | ||
join_on_partition_time_dimensions=candidate_for_join.join_on_partition_time_dimensions, | ||
validity_window=candidate_for_join.validity_window, | ||
join_type=join_type, | ||
) | ||
) | ||
return sorted( | ||
|
@@ -306,6 +336,7 @@ def evaluate_node( | |
self, | ||
start_node: BaseOutput, | ||
required_linkable_specs: Sequence[LinkableInstanceSpec], | ||
default_join_type: SqlJoinType, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was in preparation for my next PR - we'll need to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... which will override the default join type for dimension-only queries. |
||
) -> LinkableInstanceSatisfiabilityEvaluation: | ||
"""Evaluates if the "required_linkable_specs" can be realized by joining the "start_node" with other nodes. | ||
|
||
|
@@ -345,7 +376,9 @@ def evaluate_node( | |
possibly_joinable_linkable_specs.append(required_linkable_spec) | ||
|
||
candidates_for_join = self._find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( | ||
start_node_instance_set=candidate_instance_set, needed_linkable_specs=possibly_joinable_linkable_specs | ||
start_node_instance_set=candidate_instance_set, | ||
needed_linkable_specs=possibly_joinable_linkable_specs, | ||
join_type=default_join_type, | ||
) | ||
join_candidates: List[JoinLinkableInstancesRecipe] = [] | ||
|
||
|
@@ -378,6 +411,7 @@ def evaluate_node( | |
candidates_for_join = self._update_candidates_that_can_satisfy_linkable_specs( | ||
candidates_for_join=candidates_for_join, | ||
already_satisfisfied_linkable_specs=next_candidate.satisfiable_linkable_specs, | ||
join_type=default_join_type, | ||
) | ||
|
||
# The once possibly joinable specs are definitely joinable and no longer need to be searched for. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this logic to
JoinLinkableInstancesRecipe
, which seemed cleaner & more appropriate.