Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #1149

Closed
wants to merge 1 commit into from
Closed

WIP #1149

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class MultiHopJoinCandidateLineage:
to get the country dimension.
"""

# Lineage shows you the path to get to this join candidate?
first_node_to_join: BaseOutput
second_node_to_join: BaseOutput
join_second_node_by_entity: LinklessEntitySpec
Expand Down Expand Up @@ -162,8 +163,11 @@ def _get_candidates_nodes_for_multi_hop(
return ()

multi_hop_join_candidates: List[MultiHopJoinCandidate] = []
logger.info(f"Creating nodes for {desired_linkable_spec}")

logger.info(f"Checking multi-hop join candidate nodes for for {desired_linkable_spec}")
# For all right nodes, loop through to see if they can be the first join of 2 hops.
# Then loop through them again to see which ones can be the second join of 2 hops.
# They must contain the proper entities, first off.
# Then check to see if they have the desired linkable specs.
for first_node_that_could_be_joined in nodes:
data_set_of_first_node_that_could_be_joined = self._node_data_set_resolver.get_output_data_set(
first_node_that_could_be_joined
Expand Down Expand Up @@ -211,6 +215,7 @@ def _get_candidates_nodes_for_multi_hop(
# The first and second nodes are joined by this entity
entity_reference_to_join_first_and_second_nodes = desired_linkable_spec.entity_links[1]

# Check for fan-out joins, skip if found.
if not self._join_evaluator.is_valid_instance_set_join(
left_instance_set=data_set_of_first_node_that_could_be_joined.instance_set,
right_instance_set=data_set_of_second_node_that_can_be_joined.instance_set,
Expand All @@ -219,11 +224,15 @@ def _get_candidates_nodes_for_multi_hop(
continue

# filter measures out of joinable_node
# Ok - so measures are never joined to the source node. They are from the source node.
specs = data_set_of_second_node_that_can_be_joined.instance_set.spec_set
filtered_joinable_node = FilterElementsNode(
parent_node=second_node_that_could_be_joined,
include_specs=InstanceSpecSet.from_specs(
specs.dimension_specs + specs.entity_specs + specs.time_dimension_specs
specs.dimension_specs
+ specs.entity_specs
+ specs.time_dimension_specs
+ specs.group_by_metric_specs # add this!!!!
),
)

Expand Down Expand Up @@ -286,10 +295,14 @@ def add_multi_hop_joins(
lineage_for_all_multi_hop_join_candidates: Set[MultiHopJoinCandidateLineage] = set()

for desired_linkable_spec in desired_linkable_specs:
# For each linkable spec, go thru available right nodes to see if they can satisfy this spec.
# next:
for multi_hop_join_candidate in self._get_candidates_nodes_for_multi_hop(
desired_linkable_spec=desired_linkable_spec, nodes=nodes, join_type=join_type
):
# Dedupe candidates that are the same join.
# Why would they be the same join, but not the same node?
# We may never know. Oh well.
if multi_hop_join_candidate.lineage not in lineage_for_all_multi_hop_join_candidates:
all_multi_hop_join_candidates.append(multi_hop_join_candidate)
lineage_for_all_multi_hop_join_candidates.add(multi_hop_join_candidate.lineage)
Expand Down
Loading